Python利用contextvars实现管理上下文变量

目录
  • web 框架中的 request
  • ThreadLocal
  • contextvars
  • c.Token
  • contextvars.Context
  • 小结

Python 在 3.7 的时候引入了一个模块:contextvars,从名字上很容易看出它指的是上下文变量(Context Variables),所以在介绍 contextvars 之前我们需要先了解一下什么是上下文(Context)。

Context 是一个包含了相关信息内容的对象,举个例子:"比如一部 13 集的动漫,你直接点进第八集,看到女主角在男主角面前流泪了"。相信此时你是不知道为什么女主角会流泪的,因为你没有看前面几集的内容,缺失了相关的上下文信息。

所以 Context 并不是什么神奇的东西,它的作用就是携带一些指定的信息。

web 框架中的 request

我们以 fastapi 和 sanic 为例,看看当一个请求过来的时候,它们是如何解析的。

# fastapi
from fastapi import FastAPI, Request
import uvicorn

app = FastAPI()

@app.get("/index")
async def index(request: Request):
    name = request.query_params.get("name")
    return {"name": name}

uvicorn.run("__main__:app", host="127.0.0.1", port=5555)

# -------------------------------------------------------

# sanic
from sanic import Sanic
from sanic.request import Request
from sanic import response

app = Sanic("sanic")

@app.get("/index")
async def index(request: Request):
    name = request.args.get("name")
    return response.json({"name": name})

app.run(host="127.0.0.1", port=6666)

发请求测试一下,看看结果是否正确。

可以看到请求都是成功的,并且对于 fastapi 和 sanic 而言,其 request 和 视图函数是绑定在一起的。也就是在请求到来的时候,会被封装成一个 Request 对象、然后传递到视图函数中。

但对于 flask 而言则不是这样子的,我们看一下 flask 是如何接收请求参数的。

from flask import Flask, request

app = Flask("flask")

@app.route("/index")
def index():
    name = request.args.get("name")
    return {"name": name}

app.run(host="127.0.0.1", port=7777)

我们看到对于 flask 而言则是通过 import request 的方式,如果不需要的话就不用 import,当然我这里并不是在比较哪种方式好,主要是为了引出我们今天的主题。首先对于 flask 而言,如果我再定义一个视图函数的话,那么获取请求参数依旧是相同的方式,但是这样问题就来了,不同的视图函数内部使用同一个 request,难道不会发生冲突吗?

显然根据我们使用 flask 的经验来说,答案是不会的,至于原因就是 ThreadLocal。

ThreadLocal

ThreadLocal,从名字上看可以得出它肯定是和线程相关的。没错,它专门用来创建局部变量,并且创建的局部变量是和线程绑定的。

import threading

# 创建一个 local 对象
local = threading.local()

def get():
    name = threading.current_thread().name
    # 获取绑定在 local 上的 value
    value = local.value
    print(f"线程: {name}, value: {value}")

def set_():
    name = threading.current_thread().name
    # 为不同的线程设置不同的值
    if name == "one":
        local.value = "ONE"
    elif name == "two":
        local.value = "TWO"
    # 执行 get 函数
    get()

t1 = threading.Thread(target=set_, name="one")
t2 = threading.Thread(target=set_, name="two")
t1.start()
t2.start()
"""
线程 one, value: ONE
线程 two, value: TWO
"""

可以看到两个线程之间是互不影响的,因为每个线程都有自己唯一的 id,在绑定值的时候会绑定在当前的线程中,获取也会从当前的线程中获取。可以把 ThreadLocal 想象成一个字典:

{
    "one": {"value": "ONE"},
    "two": {"value": "TWO"}
}

更准确的说 key 应该是线程的 id,为了直观我们就用线程的 name 代替了,但总之在获取的时候只会获取绑定在该线程上的变量的值。

而 flask 内部也是这么设计的,只不过它没有直接用 threading.local,而是自己实现了一个 Local 类,除了支持线程之外还支持 greenlet 的协程,那么它是怎么实现的呢?首先我们知道 flask 内部存在 "请求 context" 和 "应用 context",它们都是通过栈来维护的(两个不同的栈)。

# flask/globals.py
_request_ctx_stack = LocalStack()
_app_ctx_stack = LocalStack()
current_app = LocalProxy(_find_app)
request = LocalProxy(partial(_lookup_req_object, "request"))
session = LocalProxy(partial(_lookup_req_object, "session"))

每个请求都会绑定在当前的 Context 中,等到请求结束之后再销毁,这个过程由框架完成,开发者只需要直接使用 request 即可。所以请求的具体细节流程可以点进源码中查看,这里我们重点关注一个对象:werkzeug.local.Local,也就是上面说的 Local 类,它是变量的设置和获取的关键。直接看部分源码:

# werkzeug/local.py

class Local(object):
    __slots__ = ("__storage__", "__ident_func__")

    def __init__(self):
        # 内部有两个成员:__storage__ 是一个字典,值就存在这里面
        # __ident_func__ 只需要知道它是用来获取线程 id 的即可
        object.__setattr__(self, "__storage__", {})
        object.__setattr__(self, "__ident_func__", get_ident)

    def __call__(self, proxy):
        """Create a proxy for a name."""
        return LocalProxy(self, proxy)

    def __release_local__(self):
        self.__storage__.pop(self.__ident_func__(), None)

    def __getattr__(self, name):
        try:
            # 根据线程 id 得到 value(一个字典)
            # 然后再根据 name 获取对应的值
            # 所以只会获取绑定在当前线程上的值
            return self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

    def __setattr__(self, name, value):
        ident = self.__ident_func__()
        storage = self.__storage__
        try:
            # 将线程 id 作为 key,然后将值设置在对应的字典中
            # 所以只会将值设置在当前的线程中
            storage[ident][name] = value
        except KeyError:
            storage[ident] = {name: value}

    def __delattr__(self, name):
        # 删除逻辑也很简单
        try:
            del self.__storage__[self.__ident_func__()][name]
        except KeyError:
            raise AttributeError(name)

所以我们看到 flask 内部的逻辑其实很简单,通过 ThreadLocal 实现了线程之间的隔离。每个请求都会绑定在各自的 Context 中,获取值的时候也会从各自的 Context 中获取,因为它就是用来保存相关信息的(重要的是同时也实现了隔离)。

相应此刻你已经理解了上下文,但是问题来了,不管是 threading.local 也好、还是类似于 flask 自己实现的 Local 也罢,它们都是针对线程的。如果是使用 async def 定义的协程该怎么办呢?如何实现每个协程的上下文隔离呢?所以终于引出了我们的主角:contextvars。

contextvars

该模块提供了一组接口,可用于在协程中管理、设置、访问局部 Context 的状态。

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get():
    # 获取值
    return c.get() + "~~~"

async def set_(val):
    # 设置值
    c.set(val)
    print(await get())

async def main():
    coro1 = set_("协程1")
    coro2 = set_("协程2")
    await asyncio.gather(coro1, coro2)

asyncio.run(main())
"""
协程1~~~
协程2~~~
"""

ContextVar 提供了两个方法,分别是 get 和 set,用于获取值和设置值。我们看到效果和 ThreadingLocal 类似,数据在协程之间是隔离的,不会受到彼此的影响。

但我们再仔细观察一下,我们是在 set_ 函数中设置的值,然后在 get 函数中获取值。可 await get() 相当于是开启了一个新的协程,那么意味着设置值和获取值不是在同一个协程当中。但即便如此,我们依旧可以获取到希望的结果。因为 Python 的协程是无栈协程,通过 await 可以实现级联调用。

我们不妨再套一层:

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():
    return await get2()

async def get2():
    return c.get() + "~~~"

async def set_(val):
    # 设置值
    c.set(val)
    print(await get1())
    print(await get2())

async def main():
    coro1 = set_("协程1")
    coro2 = set_("协程2")
    await asyncio.gather(coro1, coro2)

asyncio.run(main())
"""
协程1~~~
协程1~~~
协程2~~~
协程2~~~
"""

我们看到不管是 await get1() 还是 await get2(),得到的都是 set_ 中设置的结果,说明它是可以嵌套的。

并且在这个过程当中,可以重新设置值。

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():
    c.set("重新设置")
    return await get2()

async def get2():
    return c.get() + "~~~"

async def set_(val):
    # 设置值
    c.set(val)
    print("------------")
    print(await get2())
    print(await get1())
    print(await get2())
    print("------------")

async def main():
    coro1 = set_("协程1")
    coro2 = set_("协程2")
    await asyncio.gather(coro1, coro2)

asyncio.run(main())
"""
------------
协程1~~~
重新设置~~~
重新设置~~~
------------
------------
协程2~~~
重新设置~~~
重新设置~~~
------------
"""

先 await get2() 得到的就是 set_ 函数中设置的值,这是符合预期的。但是我们在 get1 中将值重新设置了,那么之后不管是 await get1() 还是直接 await get2(),得到的都是新设置的值。

这也说明了,一个协程内部 await 另一个协程,另一个协程内部 await 另另一个协程,不管套娃(await)多少次,它们获取的值都是一样的。并且在任意一个协程内部都可以重新设置值,然后获取会得到最后一次设置的值。再举个栗子:

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试")

async def get1():
    return await get2()

async def get2():
    val = c.get() + "~~~"
    c.set("重新设置啦")
    return val

async def set_(val):
    # 设置值
    c.set(val)
    print(await get1())
    print(c.get())

async def main():
    coro = set_("古明地觉")
    await coro

asyncio.run(main())
"""
古明地觉~~~
重新设置啦
"""

await get1() 的时候会执行 await get2(),然后在里面拿到 c.set 设置的值,打印 "古明地觉~~~"。但是在 get2 里面,又将值重新设置了,所以第二个 print 打印的就是新设置的值。\

如果在 get 之前没有先 set,那么会抛出一个 LookupError,所以 ContextVar 支持默认值:

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试",
                           default="哼哼")

async def set_(val):
    print(c.get())
    c.set(val)
    print(c.get())

async def main():
    coro = set_("古明地觉")
    await coro

asyncio.run(main())
"""
哼哼
古明地觉
"""

除了在 ContextVar 中指定默认值之外,也可以在 get 中指定:

import asyncio
import contextvars

c = contextvars.ContextVar("只是一个标识, 用于调试",
                           default="哼哼")

async def set_(val):
    print(c.get("古明地恋"))
    c.set(val)
    print(c.get())

async def main():
    coro = set_("古明地觉")
    await coro

asyncio.run(main())
"""
古明地恋
古明地觉
"""

所以结论如下,如果在 c.set 之前使用 c.get:

  • 当 ContextVar 和 get 中都没有指定默认值,会抛出 LookupError;
  • 只要有一方设置了,那么会得到默认值;
  • 如果都设置了,那么以 get 为准;

如果 c.get 之前执行了 c.set,那么无论 ContextVar 和 get 有没有指定默认值,获取到的都是 c.set 设置的值。

所以总的来说还是比较好理解的,并且 ContextVar 除了可以作用在协程上面,它也可以用在线程上面。没错,它可以替代 threading.local,我们来试一下:

import threading
import contextvars

c = contextvars.ContextVar("context_var")

def get():
    name = threading.current_thread().name
    value = c.get()
    print(f"线程 {name}, value: {value}")

def set_():
    name = threading.current_thread().name
    if name == "one":
        c.set("ONE")
    elif name == "two":
        c.set("TWO")
    get()

t1 = threading.Thread(target=set_, name="one")
t2 = threading.Thread(target=set_, name="two")
t1.start()
t2.start()
"""
线程 one, value: ONE
线程 two, value: TWO
"""

和 threading.local 的表现是一样的,但是更建议使用 ContextVars。不过前者可以绑定任意多个值,而后者只能绑定一个值(可以通过传递字典的方式解决这一点)。

c.Token

当我们调用 c.set 的时候,其实会返回一个 Token 对象:

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")
print(token)
"""
<Token var=<ContextVar name='context_var' at 0x00..> at 0x00...>
"""

Token 对象有一个 var 属性,它是只读的,会返回指向此 token 的 ContextVar 对象。

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")

print(token.var is c)  # True
print(token.var.get())  # val

print(
    token.var.set("val2").var.set("val3").var is c
)  # True
print(c.get())  # val3

Token 对象还有一个 old_value 属性,它会返回上一次 set 设置的值,如果是第一次 set,那么会返回一个 <Token.MISSING>。

import contextvars

c = contextvars.ContextVar("context_var")
token = c.set("val")

# 该 token 是第一次 c.set 所返回的
# 在此之前没有 set,所以 old_value 是 <Token.MISSING>
print(token.old_value)  # <Token.MISSING>

token = c.set("val2")
print(c.get())  # val2
# 返回上一次 set 的值
print(token.old_value)  # val

那么这个 Token 对象有什么作用呢?从目前来看貌似没太大用处啊,其实它最大的用处就是和 reset 搭配使用,可以对状态进行重置。

import contextvars
####
c = contextvars.ContextVar("context_var")
token = c.set("val")
# 显然是可以获取的
print(c.get())  # val

# 将其重置为 token 之前的状态
# 但这个 token 是第一次 set 返回的
# 那么之前就相当于没有 set 了
c.reset(token)
try:
    c.get()  # 此时就会报错
except LookupError:
    print("报错啦")  # 报错啦

# 但是我们可以指定默认值
print(c.get("默认值"))  # 默认值

contextvars.Context

它负责保存 ContextVars 对象和设置的值之间的映射,但是我们不会直接通过 contextvars.Context 来创建,而是通过 contentvars.copy_context 函数来创建。

import contextvars

c1 = contextvars.ContextVar("context_var1")
c1.set("val1")
c2 = contextvars.ContextVar("context_var2")
c2.set("val2")

# 此时得到的是所有 ContextVar 对象和设置的值之间的映射
# 它实现了 collections.abc.Mapping 接口
# 因此我们可以像操作字典一样操作它
context = contextvars.copy_context()
# key 就是对应的 ContextVar 对象,value 就是设置的值
print(context[c1])  # val1
print(context[c2])  # val2
for ctx, value in context.items():
    print(ctx.get(), ctx.name, value)
    """
    val1 context_var1 val1
    val2 context_var2 val2
    """

print(len(context))  # 2

除此之外,context 还有一个 run 方法:

import contextvars

c1 = contextvars.ContextVar("context_var1")
c1.set("val1")
c2 = contextvars.ContextVar("context_var2")
c2.set("val2")

context = contextvars.copy_context()

def change(val1, val2):
    c1.set(val1)
    c2.set(val2)
    print(c1.get(), context[c1])
    print(c2.get(), context[c2])

# 在 change 函数内部,重新设置值
# 然后里面打印的也是新设置的值
context.run(change, "VAL1", "VAL2")
"""
VAL1 VAL1
VAL2 VAL2
"""

print(c1.get(), context[c1])
print(c2.get(), context[c2])
"""
val1 VAL1
val2 VAL2
"""

我们看到 run 方法接收一个 callable,如果在里面修改了 ContextVar 实例设置的值,那么对于 ContextVar 而言只会在函数内部生效,一旦出了函数,那么还是原来的值。但是对于 Context 而言,它是会受到影响的,即便出了函数,也是新设置的值,因为它直接把内部的字典给修改了。

小结

以上就是 contextvars 模块的用法,在多个协程之间传递数据是非常方便的,并且也是并发安全的。如果你用过 Go 的话,你应该会发现和 Go 在 1.7 版本引入的 context 模块比较相似,当然 Go 的 context 模块功能要更强大一些,除了可以传递数据之外,对多个 goroutine 的级联管理也提供了非常清蒸的解决方案。

总之对于 contextvars 而言,它传递的数据应该是多个协程之间需要共享的数据,像 cookie, session, token 之类的,比如上游接收了一个 token,然后不断地向下透传。但是不要把本应该作为函数参数的数据,也通过 contextvars 来传递,这样就有点本末倒置了。

到此这篇关于Python利用contextvars实现管理上下文变量的文章就介绍到这了,更多相关Python contextvars管理变量内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python如何使用contextvars模块源码分析

    目录 前记 更新说明 1.有无上下文传变量的区别 2.如何使用contextvars模块 3.如何优雅的使用contextvars 4.contextvars的原理 4.1 ContextMeta,ContextVarMeta和TokenMeta 4.2 Token 4.3 全局唯一context 4.4contextvar自己封装的Context 4.5 ContextVar 5.contextvars asyncio 5.1在asyncio中获取context 5.2 对上下文的操作 5.2

  • Python利用contextvars实现管理上下文变量

    目录 web 框架中的 request ThreadLocal contextvars c.Token contextvars.Context 小结 Python 在 3.7 的时候引入了一个模块:contextvars,从名字上很容易看出它指的是上下文变量(Context Variables),所以在介绍 contextvars 之前我们需要先了解一下什么是上下文(Context). Context 是一个包含了相关信息内容的对象,举个例子:"比如一部 13 集的动漫,你直接点进第八集,看到女

  • 利用ThreadLocal实现一个上下文管理组件

    目录 1 ThreadLocal原理 set() 方法 get() 方法 withInitial()方法 ThreadLocal中的内存泄漏问题 2 自定义上下文Scope 3 在线程池中传递Scope 4 通过Filter.Scope实现Request上下文 5 总结 源代码 本文基于ThreadLocal原理,实现了一个上下文状态管理组件Scope,通过开启一个自定义的Scope,在Scope范围内,可以通过Scope各个方法读写数据: 通过自定义线程池实现上下文状态数据的线程间传递: 提出

  • python中with语句结合上下文管理器操作详解

    前言 所谓上下文管理器即在一个类中重写了__enter__方法和__exit__方法的类就可以成为上下文管理器类. 我们可以通过with语句结合上下文管理器简化一些操作. 使用with语句结合自定义上下文管理器完成数据库相应的操作,代码实现如下: # 1. 导入模块 import pymysql # 创建自定义上下文管理器对象 class MyDatabase(object): # 接收参数并创建数据库连接对象 def __init__(self, host, port, user, passw

  • python利用微信公众号实现报警功能

    微信公众号共有三种,服务号.订阅号.企业号.它们在获取AccessToken上各有不同. 其中订阅号比较坑,它的AccessToken是需定时刷新,重复获取将导致上次获取的AccessToken失效. 而企业号就比较好,AccessToken有效期同样为7200秒,但有效期内重复获取返回相同结果. 为兼容这两种方式,因此按照订阅号的方式处理.  处理办法与接口文档中的要求相同: 为了保密appsecrect,第三方需要一个access_token获取和刷新的中控服务器. 而其他业务逻辑服务器所使

  • Python利用Scrapy框架爬取豆瓣电影示例

    本文实例讲述了Python利用Scrapy框架爬取豆瓣电影.分享给大家供大家参考,具体如下: 1.概念 Scrapy是一个为了爬取网站数据,提取结构性数据而编写的应用框架. 可以应用在包括数据挖掘,信息处理或存储历史数据等一系列的程序中. 通过Python包管理工具可以很便捷地对scrapy进行安装,如果在安装中报错提示缺少依赖的包,那就通过pip安装所缺的包 pip install scrapy scrapy的组成结构如下图所示 引擎Scrapy Engine,用于中转调度其他部分的信号和数据

  • Python利用PyMuPDF实现PDF文件处理

    目录 1.PyMuPDF简介 介绍 功能 2.安装 关于命名fitz的说明 3.使用方法 导入库,查看版本 打开文档 Document的方法和属性 获取元数据 获取目标大纲 页面(Page) PDF操作 1.PyMuPDF简介 介绍 在介绍PyMuPDF之前,先来了解一下MuPDF,从命名形式中就可以看出,PyMuPDF是MuPDF的Python接口形式. MuPDF MuPDF 是一个轻量级的 PDF.XPS和电子书查看器.MuPDF 由软件库.命令行工具和各种平台的查看器组成. MuPDF 

  • python内置模块之上下文管理contextlib

    Python中当我们们打开文本时,通常会是用with语句,with语句允许我们非常方便的使用资源,而不必担心资源没有关闭. with open('/path/filename', 'r') as f: f.read() 然而,并不是只有open()函数返回fp对象才能使用 with 语句.实际上,任何对象,只要正确实现上下文管理,就可以使用with语句. 实现上下文管理是通过 __enter__ 和 __exit__ 这两个方法实现的.例如,下面的class实现了这两个方法: class Que

  • Python利用psutil实现获取硬件,网络和进程信息

    目录 楔子 CPU 相关 内存相关 磁盘相关 网络相关 进程管理 进程管理操作 楔子 Python 有一个第三方模块叫 psutil,专门用来获取操作系统以及硬件相关的信息,比如:CPU.磁盘.网络.内存等等.下面来看一下它的用法,不过在使用之前需要先安装,直接 pip install psutil 即可. CPU 相关 获取 CPU 的逻辑核心数量 import psutil print(psutil.cpu_count())  # 12 # 或者使用 multiprocessing impo

  • Python利用Diagrams绘制漂亮的系统架构图

    目录 1.准备 2.基本使用与例子 2.1 初始化与导出 2.2 节点类型 2.3 集群块 2.4 自定义线的颜色与属性 Diagrams  是一个基于Python绘制云系统架构的模块,它能够通过非常简单的描述就能可视化架构,并支持以下6个云产品的图标: AWS.Azure.GCP.K8s.阿里云 和 Oracle 云 基于Diagrams提供的节点,你只需要指定一个云产品(实际上选哪个都一样,我们只需要那个产品相应的图标,你可以选一个自己觉得好看的产品),使用其内部自带的云产品的图标,就能简单

  • python嵌套函数使用外部函数变量的方法(Python2和Python3)

    python嵌套函数使用外部函数变量的方法,Python2和Python3均可使用 python3 def b(): b = 1 def bchange(): nonlocal b b += 1 bchange() print(b) Python 2 只能这样(利用 mutable 对象): def b(): b = [1] def bchange(): b[0] += 1 bchange() print b[0]

随机推荐