详解Reactor中Context的用法

目录
  • 一、使用介绍
  • 二、源码解读
  • 三、如何桥接现有的ThreadLocal系统
  • 四、总结

在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为原生的,在处理一个数据流Flux/Mono时,基本无法知道是运行在哪个线程上或哪个线程池里,可以说,每一个操作符operator以及内部的函数都可能运行在不同的线程上。这就意味着,以前用ThreadLocal来作为方法间透明传递共享变量的方式不再行得通。为此,Reactor提供了Context来替代ThreadLocal实现一个跨线程的共享变量的透明方式

本文会从以下几个方面来介绍Context的相关知识:

  • context的基本用法
  • 从源码上解读context的用法
  • 用log的MDC案例介绍如何用context实现与threadlocal的桥接
  • 总结下context以及目前的一些局限性

一、使用介绍

static String KEY = "TEST_CONTEXT_KEY";
static String KEY2 = "TEST_CONTEXT_KEY2";

public static void main(String[] args) {
	Flux<String> flux = convert("hello", Flux.just(1, 2, 3));
	flux
    	.subscriberContext(Context.of(KEY, "Outside"))
    	.subscribe(v -> System.out.println(v));
}

public static Flux<String> convert(String prefix, Flux<Integer> publisher) {
    return publisher.map(v -> prefix + " " + v)
        .subscriberContext(Context.of(KEY, "NotUsed"))
        .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v))
        .subscriberContext(context -> context.put(KEY2, "Inside"))
        .flatMap(v -> Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + v));
}

上面是context的使用方案介绍,其输出如下:

Outside Outside Inside hello 1
Outside Outside Inside hello 2
Outside Outside Inside hello 3

上面的使用案例展示了一个使用context的常见例子。通过在外部方法里传入context,如flux.subscriberContext(Context.of(KEY, "Outside")),使得内部方法convert能够获取外界环境的context,同时内部方法还可以增加自己的context数据,如subscriberContext(context -> context.put(KEY2, "Inside")),结合之后,在让内部的方法(flatMap里的方法)感知到整个上下文context的数据内容。

对于context的使用,主要分为几个部分: 1. context的创建 2. context的写入(传入)与读取 3. 执行顺序

1. context —— 不可变对象

由于reactor天然是跨线程的,所以context设计为了不可变的对象,即每次的更新都是创建一个新的对象。每次的put/putAll操作,都是先把旧对象的值复制到新对象,然后再进行put/putAll等更新操作。

2. context的写入与读取

context写入是使用subscriberContext方法,其入参有两种形式:传值方式subscriberContext(ctx)与lambda函数方式 —— subscriberContext(ctx -> ctx.put(key,value))。

context的读取是利用Mono的静态方法subscriberContext()来获取,由于其返回的是一个Mono, 所以通常与flatMap结合使用。

3. 执行顺序

context的传入是发生在subscribe()订阅阶段的,所以其写入的顺序是从下往上的,即在示例中,先执行subscriberContext(Context.of(KEY, "Outside")),再执行subscriberContext(context -> context.put(KEY2, "Inside")), 最后执行subscriberContext(Context.of(KEY, "NotUsed"))在订阅阶段执行完后,进入运行阶段,数据流从上往下执行,每次读取context的时候Mono.subscriberContext()都是读取下一个的context。所以"NotUsed"的context并没有生效。

此外,context.put()操作是复制旧的再update新的对象,所以Mono.subscriberContext().map(ctx -> ctx.get(KEY) + " " + ctx.get(KEY2) + " " + v)这个阶段仍能读取前一个context关于KEY的内容。

总结

  • context是不可变对象,每次更新都是新的context
  • context是存在于subscriber的内部的,一个context是绑定在当前subscriber上的,如FluxContextStart的对象
  • context的写入顺序是从下而上的,读取的时候是从上而下的,只能读取之后的subscriber里的context。
  • 每个subscriber中的context都是独有的,运行阶段的时候,无法改变其他subscriber的context。

注意

subscriberContext(Context.of("Outside")subscriberContext(context -> Context.of("Outside"))是有区别,前者是会结合复用前面的context,而后者是直接返回一个新的context并不会复用前面的context。 其原因是,subscriberContext(Context.of("Outside")) 其实内部调用的是subscriberContext(context -> context.putAll(Context.of("Outside")),其入参的context就是前面的context,putAll方法会复用前面的context。而 subscriberContext(context -> Context.of("Outside"))不复用的原因就是因为放弃了入参的context。所以,可以利用这种方式来放弃之前的context,当然不鼓励这么做,因为你不清楚之前context会不会影响后续的程序。

本文章的代码用的事reactor 3.3的版本,自3.5之后,subscriberContext方法改为contextWrite,读取的方法改为deferContextual

二、源码解读

现在我们从源代码上看看,context写入为什么是自下而上的,读取的时候又是依附于下一个subscriber并且自上而下的。

public final Flux<T> subscriberContext(Function<Context, Context> doOnContext) {
	return new FluxContextStart<>(this, doOnContext);
}
FluxContextStart(Flux<? extends T> source, Function<Context, Context> doOnContext) {
	super(source);
	this.doOnContext = Objects.requireNonNull(doOnContext, "doOnContext");
}

@Override
public CoreSubscriber<? super T> subscribeOrReturn(CoreSubscriber<? super T> actual) {
    Context c = doOnContext.apply(actual.currentContext());
    return new ContextStartSubscriber<>(actual, c);
}
ContextStartSubscriber(CoreSubscriber<? super T> actual, Context context) {
    this.actual = actual;
    this.context = context;
    if (actual instanceof ConditionalSubscriber) {
        this.actualConditional = (ConditionalSubscriber<? super T>) actual;
    }
    else {
        this.actualConditional = null;
    }
}
@Override
public Context currentContext() {
    return this.context;
}

上面截取了subscriberContext方法的源代码,可以看到subscriberContext方法最终会创建ContextStartSubscriber的对象,并将生成的context赋值Context c = doOnContext.apply(actual.currentContext()),所以context是伴随subscriberContext方法对应的subscriber里的。

由于context赋值操作Context c = doOnContext.apply(actual.currentContext())是发生在subscribeOrReturn方法里,即发生在subscribe()订阅阶段,所以整个执行的顺序是自下而上的(沿着整个flow自下而上至源头的publisher)

那读取context的时候为什么是自上而下的呢?我们来看下读取操作Mono.subscribeContext()的源码。

public static Mono<Context> subscriberContext() {
    return onAssembly(MonoCurrentContext.INSTANCE);
}
final class MonoCurrentContext extends Mono<Context>
		implements Fuseable, Scannable {
	static final MonoCurrentContext INSTANCE = new MonoCurrentContext();
	public void subscribe(CoreSubscriber<? super Context> actual) {
		Context ctx = actual.currentContext();
		actual.onSubscribe(Operators.scalarSubscription(actual, ctx));
	}
}
interface InnerOperator<I, O>
		extends InnerConsumer<I>, InnerProducer<O> {
	@Override
	default Context currentContext() {
		return actual().currentContext();
	}
}

Mono.subscribeContext()方法返回的是一个MonoCurrentContext的静态对象,在订阅subscribe时期,就会去读取当前的context,即Context ctx = actual.currentContext()。而对于一个InnerOperator的接口而言,其currentContext()方法会不断寻找下一个subscriber的context,即 actual().currentContext(),直到有哪个subscriber覆写了currentContext方法,如先前的ContextStartSubscriber对象。对于InnerOperator接口,是大多数subscriber都会实现的接口,例如map、filter、flatmap这些,都会实现这个接口。

在找到context之后,通过Operators.scalarSubscription(actual, ctx)写入,这个方法其实也是Mono.just()的实现,所以相当于把context当做value,生成了一个Mono.just(ctx)来完成了context读取。

所以,context读取的是从当前操作operator之后的那个最接近的subscriber的context。这也解释了前面使用案例中,subscriberContext(Context.of(KEY, "NotUsed")),没有作用的缘故。

三、如何桥接现有的ThreadLocal系统

虽然reactor提供了context来替代ThreadLocal的使用,但目前大多数的代码库仍然是命令式编程的,使用的方式仍然是基于ThreadLocal的,如Logger里的MDC。本小节以Logger中的MDC来介绍,如何利用context实现与旧系统中的基于ThreadLocal方式的打通。

我们假设有这样的一个场景,每一次的Http请求都有一个trace id,我们称为request id,并通过Http Header "X-Request-Id"来命名,打印日志的时候,希望每条日志里都包含请求id,这样方便跟踪整个请求链路的情况。

为此,我们把日志配置里的pattern设置为:[%X{X-Request-Id}] [%thread] %-5level - %msg %n

可以在SpringBootapplication.yml里设置,如:

logging.pattern.level: "[%X{X-Request-Id}] [%thread] %-5level - %msg %n"

因此,要使得每条日志里有request id,那就必须要MDC里有key为X-Request-Id的内容。下面来看下,reactor中是如何实现的。

@SpringBootApplication
@Slf4j
@RestController
public class MdcApplication {

  public static void main(String[] args) {
    SpringApplication.run(MdcApplication.class, args);
  }

  private final static String X_REQUEST_ID_KEY = "X-Request-Id";

  @GetMapping("/")
  Flux<String> split(@RequestParam("value") String value, @RequestHeader(X_REQUEST_ID_KEY) String requestId) {
    return Flux.fromArray(value.split("_"))
        .doOnEach(logWithContext(ch -> log.info("handling one item: {}", ch)))
        .subscriberContext(Context.of(X_REQUEST_ID_KEY, requestId));
  }

  private static <T> Consumer<Signal<T>> logWithContext(Consumer<T> logStatement) {
    return signal -> {
      if (!signal.isOnNext()) {
        return;
      }
      String requestId = signal.getContext().get(X_REQUEST_ID_KEY);
      try (MDC.MDCCloseable closeable = MDC.putCloseable(X_REQUEST_ID_KEY, requestId)) {
        logStatement.accept(signal.get());
      }
    };
  }
}

这是一个简单的示例程序,对于请求输入的value值通过"-"分割后,再一个个返回给客户端。首先利用subscriberContext方法,将http header里的X-Request-Id作为context来传入。然后利用doOnEach的方式获取signal。doOnEach的方法可以工作在onNext、onComplete、onError等所有事件,每一个信号signal里都包含有context,当为onNext则还包含value值,当为onError时,则还包含有exception。因此可以通过signal来获取context。

在从context获取X-Request-Id后,可以利用try-with-resource方式来更新MDC,其效果是在执行完try里面的程序后,将更新的value回退。等价于:

try {
	MDC.put(X_REQUEST_ID_KEY, requestId);
	logStatement.accept(signal.get());
} finally {
	MDC.remove(X_REQUEST_ID_KEY);
}

置于为什么需要操作完之后回退掉MDC中的更新,那是因为reactor中所有的操作都是异步执行在不同线程中的,如果不回退的话,很有可能造成污染,其原因还是MDC内部是用ThreadLocal实现的,所以跨线程的时候,如果不把ThreadLocal值清理干净,很容易造成互相污染。

用curl命令发送请求:curl --header "X-Request-Id:12345" localhost:8080?value=a_b_c,返回的结果是abc,打印的日志如下:

[12345] [reactor-http-nio-2] INFO  - handling one item: a 
[12345] [reactor-http-nio-2] INFO  - handling one item: b 
[12345] [reactor-http-nio-2] INFO  - handling one item: c

其中12345就是从context里获取到的request id。

如果想要将request id继续贯穿后续请求流程,如请求第三方服务,可以在用webClient发送请求的时候,把request id作为header加入到它的request请求里,如:

Mono.subscriberContext().map(ctx -> {
    RequestHeadersSpec<?> request = webClient.get().uri(uri);
    request = request.header("X-Request-ID", ctx.get(X_REQUEST_ID_KEY));
    // The rest of your request logic...
});

四、总结

本文介绍了reactor中context的概念,并用代码示例的方式介绍了如何使用。再然后,通过源码的解读来加深对context使用规则的理解:自下而上的context写入,以及与subscriber绑定后的自上而下的读取。 在这之后,用以传递并打印日志中包含request id的一个实际例子,来介绍如何使用context与log的MDC一起使用。

虽然reactor自3.1开始提供了context来弥补无法使用ThreadLocal的不足,但与ThreaLocal相比,context仍然有不少局限。比如使用上的不方便,要么利用Mono.subscribeContext().map并搭配flatmap来使用,要么需要将数据流转化成信号signal流来使用,总之远不如ThreadLocal来的简单易用。另外,context的不可变特性,虽然有助于thread safe,但使得不同方法之间无法传递更新,比如方法A内修改后再传递给方法B,因为context是只读的,但这在ThreadLocal上却是轻而易举就能实现。

好消息的是,reactor在3.5开始,提供了新的方法deferContextual来简化context的使用。以及提出了context view的概念来简化context传递问题,感兴趣的可以阅读reactor文档

到此这篇关于详解Reactor中Context的用法的文章就介绍到这了,更多相关Reactor Context内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • React Context原理深入理解源码示例分析

    目录 正文 一.概念 二.使用 2.1.React.createContext 2.2.Context.Provider 2.3.React.useContext 2.4.Example 三.原理分析 3.1.createContext 函数实现 3.2. JSX 编译 3.3.消费组件 - useContext 函数实现 3.4.Context.Provider 在 Fiber 架构下的实现机制 3.5.小结 四.注意事项 五.对比 useSelector 正文 在 React 中提供了一种「

  • React 中的 useContext使用方法

    目录 什么是上下文呢? useContext使用的方法: 1.要先创建createContex 2.Provider 指定使用的范围 3.最后使用useContext useContext就是上下文 什么是上下文呢? 全局变量就是全局的上下文,全局都可以访问到它:上下文就是你运行一段代码,所要知道的所有变量 useContext使用的方法: 1.要先创建createContex 使用createContext创建并初始化 const C = createContext(null); 2.Prov

  • React Context源码实现原理详解

    目录 什么是 Context Context 使用示例 createContext Context 的设计非常特别 useContext useContext 相关源码 debugger 查看调用栈 什么是 Context 目前来看 Context 是一个非常强大但是很多时候不会直接使用的 api.大多数项目不会直接使用 createContext 然后向下面传递数据,而是采用第三方库(react-redux). 想想项目中是不是经常会用到 @connect(...)(Comp) 以及 <Pro

  • React Context 变迁及背后实现原理详解

    目录 Context 老的 Context API 基础示例 context 中断问题 解决方案 新的 Context API 基础示例 模拟实现 createContext 源码 Context 本篇我们讲 Context,Context 可以实现跨组件传递数据,大部分的时候并无需要,但有的时候,比如用户设置 了 UI 主题.地区偏好,如果从顶层一层层往下传反而有些麻烦,不如直接借助 Context 实现数据传递. 老的 Context API 基础示例 在讲最新的 API 前,我们先回顾下老

  • React Context详解使用方法

    目录 一.概述 二.API React.createContext Context.Provider Class.contextType Context.Consumer Context.displayName 三.使用 1.自定义Context (类组件中使用) 2.使用Consumer支持获取多个Context上的值 3.useContext使用(函数式组件中使用) 一.概述 Context 提供了一个无需为每层组件手动添加 props,就能在组件树间进行数据传递的方法. 如果多个组件中都需

  • 详解Reactor中Context的用法

    目录 一.使用介绍 二.源码解读 三.如何桥接现有的ThreadLocal系统 四.总结 在响应式编程中,多线程异步性成为天然的内在,多线程之间的切换也成为原生的,在处理一个数据流Flux/Mono时,基本无法知道是运行在哪个线程上或哪个线程池里,可以说,每一个操作符operator以及内部的函数都可能运行在不同的线程上.这就意味着,以前用ThreadLocal来作为方法间透明传递共享变量的方式不再行得通.为此,Reactor提供了Context来替代ThreadLocal实现一个跨线程的共享变

  • 详解Golang中Context的原理和使用技巧

    目录 Context 背景 和 适用场景 Context 的背景 Context 的功能和目的 Context 的基本使用 Context 的同步控制设计 Context 的定义和实现 Context interface 接口定义 parent Context 的具体实现 Context 的继承和各种 With 系列函数 Context 的常用方法实例 1. 调用 Context Done方法取消 2. 通过 context.WithValue 来传值 3. 超时取消 context.WithT

  • 详解C# 中Session的用法

    Session模型简介 在学习之前我们会疑惑,Session是什么呢?简单来说就是服务器给客户端的一个编号.当一台WWW服务器运行时,可能有若干个用户浏览正在运正在这台服务器上的网站.当每 个用户首次与这台WWW服务器建立连接时,他就与这个服务器建立了一个Session,同时服务器会自动为其分配一个SessionID,用以标识这个用 户的唯一身份.这个SessionID是由WWW服务器随机产生的一个由24个字符组成的字符串,我们会在下面的实验中见到它的实际样子. 这个唯一的SessionID是有

  • 详解C++中mutable的用法

    代码编译运行环境:VS2017+Win32+Debug mutalbe的中文意思是"可变的,易变的",是constant(即C++中的const)的反义词.在C++中,mutable也是为了突破const的限制而设置的,被mutable修饰的变量将永远处于可变的状态. mutable的作用有两点: (1)保持常量对象中大部分数据成员仍然是"只读"的情况下,实现对个别数据成员的修改: (2)使类的const函数可以修改对象的mutable数据成员. 使用mutable

  • 详解Golang中Channel的用法

    如果说goroutine是Go语言程序的并发体的话,那么channels则是它们之间的通信机制.一个channel是一个通信机制,它可以让一个goroutine通过它给另一个goroutine发送值信息. 1 创建channel 每个channel都有一个特殊的类型,也就是channels可发送数据的类型.一个可以发送int类型数据 的channel一般写为chan int.使用内置的make函数,如果第二个参数大于0,则表示创建一个带缓存的channel. ch := make(chan in

  • 一文详解Python中复合语句的用法

    目录 Python复合语句 1.if 语句 2.while 语句 3.for 语句 4.try 语句 5.with 语句 6.match 语句 Python复合语句 复合语句是包含其它语句(语句组)的语句:它们会以某种方式影响或控制所包含其它语句的执行.通常,复合语句会跨越多行,虽然在某些简单形式下整个复合语句也可能包含于一行之内. if.while和for语句用来实现传统的控制流程构造.try语句为一组语句指定异常处理和/和清理代码,而with语句允许在一个代码块周围执行初始化和终结化代码.函

  • 详解python中静态方法staticmethod用法

    在开发的时候, 可以使用类对方法进行封装,如果某一个方法需要访问到对象的实例属性,可以把这个方法封装成一个实例方法.如果某一个方法不需要访问对象的实例属性,但是需要访问到类的类属性,这个时候就可以考虑把这个方法封装成一个类方法.一个实例方法, 一个类方法,这是两种方法类型,但是在开发中还有一种情况,如果要封装的某一个方法,既不需要访问到对象的实例属性,也不需要访问类的类属性,这个时候就可以考虑把这个方法封装成一个静态方法. 在开发中,如果类中的某个方法既不需要访问实例属性或者调用实例方法,同时也

  • C++ static详解,类中的static用法说明

    目录 C++static详解,类中static用法 static特点:用来控制存储方式和可见性 类中的static关键字 什么时候用static? 为什么要引入static? c++中static总结 1. 概念 2. 面向过程的static 3. 面向对象中的static 4. 小结 C++static详解,类中static用法 static特点:用来控制存储方式和可见性 ① 存储空间:静态存储区(控制变量的存储方式) 静态变量存储在静态存储区(存储在静态存储区的变量,如果不显式地对其进行初始

  • 详解python中mongoengine库用法

    目录 一.MongoDB的安装与连接 二.MongoEngine模型介绍 2.1.ODM模型介绍 2.2.常见数据类型 2.3.数据类型通用参数 2.4.类属性meta常见配置项 2.5.文档的嵌套模型 三.添加数据 3.1.方式一 3.2.方式二:使用create()方法 四.查询数据 4.1.单个文档查询 4.2.条件查询 4.3.聚合统计 4.4.排序 4.5.分页处理 五.修改和删除数据 5.1.修改数据 5.2.删除数据 一.MongoDB的安装与连接 安装:pip install m

  • 详解python中*号的用法

    1.表示乘号 2.表示倍数,例如: def T(msg,time=1): print((msg+' ')*time) T('hi',3) 打印结果(打印3次): hi hi hi 3.单个 * (1).如:*parameter是用来接受任意多个参数并将其放在一个元组中. >>> def demo(*p): print(p) >>> demo(1,2,3) (1, 2, 3) (2).函数在调用多个参数时,在列表.元组.集合.字典及其他可迭代对象作为实参,并在前面加 *

随机推荐