Python中Async语法协程的实现

目录
  • 前记
  • 1.传统的Sync语法请求例子
  • 2.异步的请求
  • 3.基于生成器的协程
    • 3.1生成器
    • 3.2用生成器实现协程

前记

在io比较多的场景中, Async语法编写的程序会以更少的时间, 更少的资源来完成相同的任务, 这篇文章则是介绍了PythonAsync语法的协程是如何实现的。

1.传统的Sync语法请求例子

还是一样, 在了解Async语法的实现之前, 先从一个Sync的语法例子开始, 现在假设有一个HTTP请求, 这个程序会通过这个请求获取对应的响应内容, 并打印出来,

代码如下:

import socket
def request(host: str) -> None:
    """模拟请求并打印响应体"""
    url: str = f"http://{host}"
    sock: socket.SocketType = socket.socket()
    sock.connect((host, 80))
    sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
    response_bytes: bytes = b""
    chunk: bytes = sock.recv(4096)
    while chunk:
        response_bytes += chunk
        chunk = sock.recv(4096)
    print("\n".join([i for i in response_bytes.decode().split("\r\n")]))
if __name__ == "__main__":
    request("so1n.me")

运行程序, 程序能够正常输出, 上部分打印了对应的HTTP响应Header, 下部分打印了HTTP响应体, , 可以看到服务端叫我们以https的形式重新请求,

输出结果如下:

HTTP/1.1 301 Moved Permanently
Server: GitHub.com
Content-Type: text/html
Location: https://so1n.me/
X-GitHub-Request-Id: A744:3871:4136AF:48BD9F:6188DB50
Content-Length: 162
Accept-Ranges: bytes
Date: Mon, 08 Nov 2021 08:11:37 GMT
Via: 1.1 varnish
Age: 104
Connection: close
X-Served-By: cache-qpg1272-QPG
X-Cache: HIT
X-Cache-Hits: 2
X-Timer: S1636359097.026094,VS0,VE0
Vary: Accept-Encoding
X-Fastly-Request-ID: 22fa337f777553d33503cee5282598c6a293fb5e
<html>
<head><title>301 Moved Permanently</title></head>
<body>
<center><h1>301 Moved Permanently</h1></center>
<hr><center>nginx</center>
</body>
</html>

不过这里并不是想说HTTP请求是如何实现的, 具体我也不太了解, 在这个代码中, socket的默认调用是阻塞的, 当线程调用connect或者recv时(send是不用等待的, 但在高并发下需要先等待drain后才可以send, 小demo不需要用到drain方法), 程序将会暂停直到操作完成。 当一次要下载很多网页的话, 这将会如上篇文章所说的一样, 大部分的等待时间都花在io上面, cpu却一直空闲时, 而使用线程池虽然可以解决这个问题, 但是开销是很大的, 同时操作系统往往会限制一个进程,用户或者机器可以使用的线程数, 而协程却没有这些限制, 占用的资源少, 也没有系统限制瓶颈。

2.异步的请求

异步可以让一个单独的线程处理并发的操作, 不过在上面已经说过了, socket是默认阻塞的, 所以需要把socket设置为非阻塞的, socket提供了setblocking这个方法供开发者选择是否阻塞, 在设置了非阻塞后, connectrecv方法也要进行更改。

由于没有了阻塞, 程序在调用了connect后会马上返回, 只不过Python的底层是C, 这段代码在C中调用非阻塞的socket.connect后会抛出一个异常, 我们需要捕获它, 就像这样:

import socket
sock: socket.SocketType = socket.socket()
sock.setblocking(Flase)
try:
    sock.connect(("so1n.me", 80))
except BlockingIOError:
    pass

经过一顿操作后, 就开始申请建立连接了, 但是我们还不知道连接啥时候完成建立, 由于连接没建立时调用send会报错, 所以可以一直轮询调用send直到没报错就认为是成功(真实代码需要加超时):

while True:
    try:
        sock.send(request)
        break
    except OSError as e:
        pass

但是这样让CPU空转太浪费性能了, 而且期间还不能做别的事情, 就像我们点外卖后一直打电话过去问饭菜做好了没有, 十分浪费电话费用, 要是饭菜做完了就打电话告诉我们, 那就只产生了一笔费用, 非常的省钱(正常情况下也是这样子)。
这时就需要事件循环登场了,在类UNIX中, 有一个叫select的功能, 它可以等待事件发生后再调用监听的函数, 不过一开始的实现性能不是很好, 在Linux上被epoll取代, 不过接口是类似的, 所在在Python中把这几个不同的事件循环都封装在selectors库中, 同时可以通过DefaultSelector从系统中挑出最好的类select函数。
这里先暂时不说事件循环的原理, 事件循环最主要的是他名字的两部分, 一个是事件, 一个是循环, 在Python中, 可以通过如下方法把事件注册到事件循环中:

def demo(): pass
selector.register(fd, EVENT_WRITE, demo)

这样这个事件循环就会监听对应的文件描述符fd, 当这个文件描述符触发写入事件(EVENT_WRITE)时,事件循环就会告诉我们可以去调用注册的函数demo。不过如果把上面的代码都改为这种方法去运行的话就会发现, 程序好像没跑就结束了, 但程序其实是有跑的, 只不过他们是完成的了注册, 然后就等待开发者接收事件循环的事件进行下一步的操作, 所以我们只需要在代码的最后面写上如下代码:

while True:
    for key, mask in selector.select():
        key.data()

这样程序就会一直运行, 当捕获到事件的时候, 就会通过for循环告诉我们, 其中key.data是我们注册的回调函数, 当事件发生时, 就会通知我们, 我们可以通过拿到回调函数然后就运行, 了解完毕后, 我们可以来编写我们的第一个并发程序, 他实现了一个简单的I/O复用的小逻辑, 代码和注释如下:

import socket
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
# 选择事件循环
selector: DefaultSelector = DefaultSelector()
# 用于判断是否有事件在运行
running_cnt: int = 0
def request(host: str) -> None:
    """模拟请求并打印响应体"""
    # 告诉主函数, 自己的事件还在运行
    global running_cnt
    running_cnt += 1

    # 初始化socket
    url: str = f"http://{host}"
    sock: socket.SocketType = socket.socket()
    sock.setblocking(False)
    try:
        sock.connect((host, 80))
    except BlockingIOError:
        pass
    response_bytes: bytes = b""
    def read_response() -> None:
        """接收响应参数, 并判断请求是否结束"""
        nonlocal response_bytes
        chunk: bytes = sock.recv(4096)
        print(f"recv {host} body success")
        if chunk:
            response_bytes += chunk
        else:
            # 没有数据代表请求结束了, 注销监听
            selector.unregister(sock.fileno())
            global running_cnt
            running_cnt -= 1
    def connected() -> None:
        """socket建立连接时的回调"""
        # 取消监听
        selector.unregister(sock.fileno())
        print(f"{host} connect success")
        # 发送请求, 并监听读事件, 以及注册对应的接收响应函数
        sock.send(f"GET {url} HTTP/1.0\r\nHost: {host}\r\n\r\n".encode("ascii"))
        selector.register(sock.fileno(), EVENT_READ, read_response)
    selector.register(sock.fileno(), EVENT_WRITE, connected)
if __name__ == "__main__":
    # 同时多个请求
    request("so1n.me")
    request("github.com")
    request("google.com")
    request("baidu.com")
    # 监听是否有事件在运行
    while running_cnt > 0:
        # 等待事件循环通知事件是否已经完成
        for key, mask in selector.select():
            key.data()

这段代码接近同时注册了4个请求并注册建立连接回调, 然后就进入事件循环逻辑, 也就是把控制权交给事件循环, 直到事件循环告诉程序说收到了socket建立的通知, 程序就会取消注册的回调然后发送请求, 并注册一个读的事件回调, 然后又把控制权交给事件循环, 直到收到了响应的结果才进入处理响应结果函数并且只有收完所有响应结果才会退出程序。

下面是我其中的一次执行结果:

so1n.me connect success
github.com connect success
google.com connect success
recv google.com body success
recv google.com body success
baidu.com connect success
recv github.com body success
recv github.com body success
recv baidu.com body success
recv baidu.com body success
recv so1n.me body success
recv so1n.me body success

可以看到他们的执行顺序是随机的, 不是严格的按照so1n.megithub.comgoogle.combaidu.com顺序执行, 同时他们执行速度很快, 这个程序的耗时约等于响应时长最长的函数耗时。
但是可以看出, 这个程序里面出现了两个回调, 回调会让代码变得非常的奇怪, 降低可读性, 也容易造成回调地狱, 而且当回调发生报错的时候, 我们是很难知道这是由于什么导致的错误, 因为它的上下文丢失了, 这样子排查问题十分的困惑。 作为程序员, 一般都不止满足于速度快的代码, 真正想要的是又快, 又能像Sync的代码一样简单, 可读性强, 也能容易排查问题的代码, 这种组合形式的代码的设计模式就叫协程。

协程出现得很早, 它不像线程一样, 被系统调度, 而是能自主的暂停, 并等待事件循环通知恢复。由于协程是软件层面实现的, 所以它的实现方式有很多种, 这里要说的是基于生成器的协程, 因为生成器跟协程一样, 都有暂停让步和恢复的方法(还可以通过throw来抛错), 同时它跟Async语法的协程很像, 通过了解基于生成器的协程, 可以了解Async的协程是如何实现的。

3.基于生成器的协程

3.1生成器

在了解基于生成器的协程之前, 需要先了解下生成器, Python的生成器函数与普通的函数会有一些不同, 只有普通函数中带有关键字yield, 那么它就是生成器函数,

具体有什么不同可以通过他们的字节码来了解:

In [1]: import dis
# 普通函数
In [2]: def aaa(): pass
In [3]: dis.dis(aaa)

  1           0 LOAD_CONST               0 (None)
              2 RETURN_VALUE

# 普通函数调用函数
In [4]: def bbb():
   ...:     aaa()
   ...:
In [5]: dis.dis(bbb)

  2           0 LOAD_GLOBAL              0 (aaa)
              2 CALL_FUNCTION            0
              4 POP_TOP
              6 LOAD_CONST               0 (None)
              8 RETURN_VALUE

# 普通生成器函数
In [6]: def ccc(): yield

In [7]: dis.dis(ccc)

  1           0 LOAD_CONST               0 (None)
              2 YIELD_VALUE
              4 POP_TOP
              6 LOAD_CONST               0 (None)
              8 RETURN_VALUE

上面分别是普通函数, 普通函数调用函数和普通生成器函数的字节码, 从字节码可以看出来, 最简单的函数只需要LOAD_CONST来加载变量None压入自己的栈, 然后通过RETURN_VALUE来返回值, 而有函数调用的普通函数则先加载变量, 把全局变量的函数aaa加载到自己的栈里面, 然后通过CALL_FUNCTION来调用函数, 最后通过POP_TOP把函数的返回值从栈里抛出来, 再把通过LOAD_CONST把None压入自己的栈, 最后返回值。
而生成器函数则不一样, 它会先通过LOAD_CONST来加载变量None压入自己的栈, 然后通过YIELD_VALUE返回值, 接着通过POP_TOP弹出刚才的栈并重新把变量None压入自己的栈, 最后通过RETURN_VALUE来返回值。从字节码来分析可以很清楚的看到, 生成器能够在yield区分两个栈帧, 一个函数调用可以分为多次返回, 很符合协程多次等待的特点。

接着来看看生成器的一个使用, 这个生成器会有两次yield调用, 并在最后返回字符串'None'

代码如下:

In [8]: def demo():
   ...:     a = 1
   ...:     b = 2
   ...:     print('aaa', locals())
   ...:     yield 1
   ...:     print('bbb', locals())
   ...:     yield 2
   ...:     return 'None'
   ...:
In [9]: demo_gen = demo()
In [10]: demo_gen.send(None)
aaa {'a': 1, 'b': 2}
Out[10]: 1
In [11]: demo_gen.send(None)
bbb {'a': 1, 'b': 2}
Out[11]: 2
In [12]: demo_gen.send(None)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-12-8f8cb075d6af> in <module>
----> 1 demo_gen.send(None)
StopIteration: None

这段代码首先通过函数调用生成一个demo_gen的生成器对象, 然后第一次send调用时返回值1, 第二次send调用时返回值2, 第三次send调用则抛出StopIteration异常, 异常提示为None, 同时可以看到第一次打印aaa和第二次打印bbb时, 他们都能打印到当前的函数局部变量, 可以发现在即使在不同的栈帧中, 他们读取到当前的局部函数内的局部变量是一致的, 这意味着如果使用生成器来模拟协程时, 它还是会一直读取到当前上下文的, 非常的完美。

此外, Python还支持通过yield from语法来返回一个生成器, 代码如下:

In [1]: def demo_gen_1():
   ...:     for i in range(3):
   ...:         yield i
   ...:

In [2]: def demo_gen_2():
   ...:     yield from demo_gen_1()
   ...:
In [3]: demo_gen_obj = demo_gen_2()

In [4]: demo_gen_obj.send(None)

Out[4]: 0

In [5]: demo_gen_obj.send(None)

Out[5]: 1

In [6]: demo_gen_obj.send(None)

Out[6]: 2
In [7]: demo_gen_obj.send(None)
---------------------------------------------------------------------------
StopIteration                             Traceback (most recent call last)
<ipython-input-7-f9922a2f64c9> in <module>
----> 1 demo_gen_obj.send(None)
StopIteration: 

通过yield from就可以很方便的支持生成器调用, 假如把每个生成器函数都当做一个协程, 那通过yield from就可以很方便的实现协程间的调用, 此外生成器的抛出异常后的提醒非常人性化, 也支持throw来抛出异常, 这样我们就可以实现在协程运行时设置异常, 比如Cancel,演示代码如下:

In [1]: def demo_exc():
   ...:     yield 1
   ...:     raise RuntimeError()
   ...:
In [2]: def demo_exc_1():
   ...:     for i in range(3):
   ...:         yield i
   ...:

In [3]: demo_exc_gen = demo_exc()
In [4]: demo_exc_gen.send(None)
Out[4]: 1

In [5]: demo_exc_gen.send(None)
---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-5-09fbb75fdf7d> in <module>
----> 1 demo_exc_gen.send(None)
<ipython-input-1-69afbc1f9c19> in demo_exc()
      1 def demo_exc():
      2     yield 1
----> 3     raise RuntimeError()
      4
RuntimeError: 

In [6]: demo_exc_gen_1 = demo_exc_1()

In [7]: demo_exc_gen_1.send(None)
                                                                                                                                        Out[7]: 0

In [8]: demo_exc_gen_1.send(None)
                                                                                                                                        Out[8]: 1
In [9]: demo_exc_gen_1.throw(RuntimeError)
                                                                                                                                        ---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-9-1a1cc55d71f4> in <module>
----> 1 demo_exc_gen_1.throw(RuntimeError)
<ipython-input-2-2617b2366dce> in demo_exc_1()
      1 def demo_exc_1():
      2     for i in range(3):
----> 3         yield i
      4
RuntimeError: 

从中可以看到在运行中抛出异常时, 会有一个非常清楚的抛错, 可以明显看出错误堆栈, 同时throw指定异常后, 会在下一处yield抛出异常(所以协程调用Cancel后不会马上取消, 而是下一次调用的时候才被取消)。

3.2用生成器实现协程

我们已经简单的了解到了生成器是非常的贴合协程的编程模型, 同时也知道哪些生成器API是我们需要的API, 接下来可以模仿Asyncio的接口来实现一个简单的协程。

首先是在Asyncio中有一个封装叫Feature, 它用来表示协程正在等待将来时的结果, 以下是我根据asyncio.Feature封装的一个简单的Feature, 它的API没有asyncio.Feature全, 代码和注释如下:

class Status:
    """用于判断Future状态"""
    pending: int = 1
    finished: int = 2
    cancelled: int = 3
class Future(object):

    def __init__(self) -> None:
        """初始化时, Feature处理pending状态, 等待set result"""
        self.status: int = Status.pending
        self._result: Any = None
        self._exception: Optional[Exception] = None
        self._callbacks: List[Callable[['Future'], None]] = []

    def add_done_callback(self, fn: [['Future'], None]Callable) -> None:
        """添加完成时的回调"""
        self._callbacks.append(fn)

    def cancel(self):
        """取消当前的Feature"""
        if self.status != Status.pending:
            return False
        self.status = Status.cancelled
        for fn in self._callbacks:
            fn(self)
        return True

    def set_exception(self, exc: Exception) -> None:
        """设置异常"""
        if self.status != Status.pending:
            raise RuntimeError("Can not set exc")
        self._exception = exc
        self.status = Status.finished

    def set_result(self, result: Any) -> None:
        """设置结果"""
        if self.status != Status.pending:
            raise RuntimeError("Can not set result")
        self.status = Status.finished
        self._result = result
        for fn in self._callbacks:
            fn(self)
    def result(self):
        """获取结果"""
        if self.status == Status.cancelled:
            raise asyncio.CancelledError
        elif self.status != Status.finished:
            raise RuntimeError("Result is not read")
        elif self._exception is not None:
            raise self._exception
        return self._result
    def __iter__(self):
        """通过生成器来模拟协程, 当收到结果通知时, 会返回结果"""
        if self.status == Status.pending:
            yield self
        return self.result()

在理解Future时, 可以把它假想为一个状态机, 在启动初始化的时候是peding状态, 在运行的时候我们可以切换它的状态, 并且通过__iter__方法来支持调用者使用yield from Future()来等待Future本身, 直到收到了事件通知时, 可以得到结果。

但是可以发现这个Future是无法自我驱动, 调用了__iter__的程序不知道何时被调用了set_result, 在Asyncio中是通过一个叫Task的类来驱动Future, 它将一个协程的执行过程安排好, 并负责在事件循环中执行该协程。它主要有两个方法:

  • 1.初始化时, 会先通过send方法激活生成器
  • 2.后续被调度后马上安排下一次等待, 除非抛出StopIteration异常

还有一个支持取消运行托管协程的方法(在原代码中, Task是继承于Future, 所以Future有的它都有), 经过简化后的代码如下:

class Task:
    def __init__(self, coro: Generator) -> None:
        # 初始化状态
        self.cancelled: bool = False
        self.coro: Generator = coro
        # 预激一个普通的future
        f: Future = Future()
        f.set_result(None)
        self.step(f)
    def cancel(self) -> None:
        """用于取消托管的coro"""
        self.coro.throw(asyncio.CancelledError)

    def step(self, f: Future) -> None:
        """用于调用coro的下一步, 从第一次激活开始, 每次都添加完成时的回调, 直到遇到取消或者StopIteration异常"""
        try:
            _future = self.coro.send(f.result())
        except asyncio.CancelledError:
            self.cancelled = True
            return
        except StopIteration:
            return
        _future.add_done_callback(self.step)

这样FutureTask就封装好了, 可以简单的试一试效果如何:

In [2]:def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:
   ...:    result = yield from f
   ...:    print(flag_int, result)
   ...:
   ...:future: Future = Future()
   ...:for i in range(3):
   ...:    coro = wait_future(future, i)
   ...:    # 托管wait_future这个协程, 里面的Future也会通过yield from被托管
   ...:    Task(coro)
   ...:
   ...:print('ready')
   ...:future.set_result('ok')
   ...:
   ...:future = Future()
   ...:Task(wait_future(future, 3)).cancel()
   ...:
                                                                                                                                        ready
0 ok
1 ok
2 ok
---------------------------------------------------------------------------
CancelledError                            Traceback (most recent call last)
<ipython-input-2-2d1b04db2604> in <module>
     12
     13 future = Future()
---> 14 Task(wait_future(future, 3)).cancel()

<ipython-input-1-ec3831082a88> in cancel(self)
     81
     82     def cancel(self) -> None:
---> 83         self.coro.throw(asyncio.CancelledError)
     84
     85     def step(self, f: Future) -> None:

<ipython-input-2-2d1b04db2604> in wait_future(f, flag_int)
      1 def wait_future(f: Future, flag_int: int) -> Generator[Future, None, None]:
----> 2     result = yield from f
      3     print(flag_int, result)
      4
      5 future: Future = Future()

<ipython-input-1-ec3831082a88> in __iter__(self)
     68         """通过生成器来模拟协程, 当收到结果通知时, 会返回结果"""
     69         if self.status == Status.pending:
---> 70             yield self
     71         return self.result()
     72 

CancelledError: 

这段程序会先初始化Future, 并把Future传给wait_future并生成生成器, 再交由给Task托管, 预激, 由于Future是在生成器函数wait_future中通过yield from与函数绑定的, 真正被预激的其实是Future__iter__方法中的yield self, 此时代码逻辑会暂停在yield self并返回。
在全部预激后, 通过调用Futureset_result方法, 使Future变为结束状态, 由于set_result会执行注册的回调, 这时它就会执行托管它的Taskstep方法中的send方法, 代码逻辑回到Future__iter__方法中的yield self, 并继续往下走, 然后遇到return返回结果, 并继续走下去, 从输出可以发现程序封装完成且打印了ready后, 会依次打印对应的返回结果, 而在最后一个的测试cancel方法中可以看到,Future抛出异常了, 同时这些异常很容易看懂, 能够追随到调用的地方。

现在FutureTask正常运行了, 可以跟我们一开始执行的程序进行整合, 代码如下:

class HttpRequest(object):
    def __init__(self, host: str):
        """初始化变量和sock"""
        self._host: str = host
        global running_cnt
        running_cnt += 1
        self.url: str = f"http://{host}"
        self.sock: socket.SocketType = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect((host, 80))
        except BlockingIOError:
            pass
    def read(self) -> Generator[Future, None, bytes]:
        """从socket获取响应数据, 并set到Future中, 并通过Future.__iter__方法或得到数据并通过变量chunk_future返回"""
        f: Future = Future()
        selector.register(self.sock.fileno(), EVENT_READ, lambda: f.set_result(self.sock.recv(4096)))
        chunk_future = yield from f
        selector.unregister(self.sock.fileno())
        return chunk_future  # type: ignore
    def read_response(self) -> Generator[Future, None, bytes]:
        """接收响应参数, 并判断请求是否结束"""
        response_bytes: bytes = b""
        chunk = yield from self.read()
        while chunk:
            response_bytes += chunk
            chunk = yield from self.read()
        return response_bytes
    def connected(self) -> Generator[Future, None, None]:
        """socket建立连接时的回调"""
        # 取消监听
        f: Future = Future()
        selector.register(self.sock.fileno(), EVENT_WRITE, lambda: f.set_result(None))
        yield f
        selector.unregister(self.sock.fileno())
        print(f"{self._host} connect success")

    def request(self) -> Generator[Future, None, None]:
        # 发送请求, 并监听读事件, 以及注册对应的接收响应函数
        yield from self.connected()
        self.sock.send(f"GET {self.url} HTTP/1.0\r\nHost: {self._host}\r\n\r\n".encode("ascii"))
        response = yield from self.read_response()
        print(f"request {self._host} success, length:{len(response)}")
        global running_cnt
        running_cnt -= 1
if __name__ == "__main__":
    # 同时多个请求
    Task(HttpRequest("so1n.me").request())
    Task(HttpRequest("github.com").request())
    Task(HttpRequest("google.com").request())
    Task(HttpRequest("baidu.com").request())
    # 监听是否有事件在运行
    while running_cnt > 0:
        # 等待事件循环通知事件是否已经完成
        for key, mask in selector.select():
            key.data()

这段代码通过Future和生成器方法尽量的解耦回调函数, 如果忽略了HttpRequest中的connectedread方法则可以发现整段代码跟同步的代码基本上是一样的, 只是通过yieldyield from交出控制权和通过事件循环恢复控制权。 同时通过上面的异常例子可以发现异常排查非常的方便, 这样一来就没有了回调的各种糟糕的事情, 开发者只需要按照同步的思路进行开发即可, 不过我们的事件循环是一个非常简单的事件循环例子, 同时对于socket相关都没有进行封装, 也缺失一些常用的API, 而这些都会被Python官方封装到Asyncio这个库中, 通过该库, 我们可以近乎完美的编写Async语法的代码。

NOTE: 由于生成器协程中无法通过yield from语法使用生成器, 所以Python在3.5之后使用了Await的原生协程。

到此这篇关于Python中Async语法协程的实现的文章就介绍到这了,更多相关Python协程内容请搜索我们以前的文章或继续浏览下面的相关文章希望大家以后多多支持我们!

(0)

相关推荐

  • python协程与 asyncio 库详情

    目录 1.asyncio 异步 I/O 库 异步函数的定义 事件循环 event_loop 创建 task 回调返回值 循环事件关闭 2.本节爬虫项目 前言: python 中协程概念是从 3.4 版本增加的,但 3.4 版本采用是生成器实现,为了将协程和生成器的使用场景进行区分,使语义更加明确,在 python 3.5 中增加了 async 和 await 关键字,用于定义原生协程. 1.asyncio 异步 I/O 库 python 中的 asyncio 库提供了管理事件.协程.任务和线程的

  • Python协程asyncio模块的演变及高级用法

    Python协程及asyncio基础知识 协程(coroutine)也叫微线程,是实现多任务的另一种方式,是比线程更小的执行单元,一般运行在单进程和单线程上.因为它自带CPU的上下文,它可以通过简单的事件循环切换任务,比进程和线程的切换效率更高,这是因为进程和线程的切换由操作系统进行. Python实现协程的主要借助于两个库:asyncio和gevent.由于asyncio已经成为python的标准库了无需pip安装即可使用,这意味着asyncio作为Python原生的协程实现方式会更加流行.本

  • python实习总结(yeild,async,azwait和协程)

    目录 一.yield使用简析 二.async和await的使用 1.什么是进程.协程.异步? 2.如何处理200W数量的url,把所有的url保存下来? 3.使用async的await和gather 三.协程的理解 总结 一.yield使用简析 yield是一个生成器generator,返回一个interable对象. 该对象具有next()方法,可以通过next()查看接下来的元素是什么. 1.interable对象 ,可以遍历的对象,如: list,str,tuple,dict,file,x

  • Python协程asyncio 异步编程笔记分享

    目录 1.事件循环 2.协程和异步编程 2.1 基本使用 2.2 await 2.3 Task对象 1.事件循环 可以理解成为一个死循环,去检查任务列表中的任务,如果可执行就去执行,如果检查不到就是不可执行的,那就忽略掉去执行其他可执行的任务,如果IO结束了(比如说去百度下载图片,下载完了就会变成可执行任务)再去执行下载完成之后的逻辑 #这里的任务是有状态的,比如这个任务已经完成或者正在执行或者正在IO等待 任务列表 = [ 任务1, 任务2, 任务3,... ] while True: 可执行

  • 浅谈Python协程asyncio

    一.协程 官方描述; 协程是子例程的更一般形式. 子例程可以在某一点进入并在另一点退出. 协程则可以在许多不同的点上进入.退出和恢复. 它们可通过 async def 语句来实现. 参见 PEP 492. 协程不是计算机内部提供的,不像进程.线程,由电脑本身提供,它是由程序员人为创造的, 实现函数异步执行. 协程(Coroutine),也可以被称为微线程,是一种用户太内的上下文切换技术,其实就是通过一个线程实现代码块相互切换执行.看上去像子程序,但执行过程中,在子程序内部可中断,然后转而执行别的

  • python 使用事件对象asyncio.Event来同步协程的操作

    事件对象asyncio.Event是基于threading.Event来实现的. 事件可以一个信号触发多个协程同步工作, 例子如下: import asyncio import functools def set_event(event): print('setting event in callback') event.set() async def coro1(event): print('coro1 waiting for event') await event.wait() print(

  • python 中的 asyncio 异步协程

    目录 一.定义协程 二.运行协程 三.协程回调 四.运行多个协程 五.run_forever 六.多协程中关闭run_forever 一.定义协程 asyncio 执行的任务,称为协程,但是Asyncio 并不能带来真正的并行 Python 的多线程因为 GIL(全局解释器锁)的存在,也不能带来真正的并行 import asyncio # 通过 async 定义一个协程 async def task(): print('这是一个协程') # 判断是否是一个协程,返回True print(asyn

  • python asyncio 协程库的使用

    asyncio 是 python 力推多年的携程库,与其 线程库 相得益彰,更轻量,并且协程可以访问同一进程中的变量,不需要进程间通信来传递数据,所以使用起来非常顺手. asyncio 官方文档写的非常简练和有效,半小时内可以学习和测试完,下面为我的一段 HelloWrold,感觉可以更快速的帮你认识 协程 . 定义协程 import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay)

  • Python中Async语法协程的实现

    目录 前记 1.传统的Sync语法请求例子 2.异步的请求 3.基于生成器的协程 3.1生成器 3.2用生成器实现协程 前记 在io比较多的场景中, Async语法编写的程序会以更少的时间, 更少的资源来完成相同的任务, 这篇文章则是介绍了Python的Async语法的协程是如何实现的. 1.传统的Sync语法请求例子 还是一样, 在了解Async语法的实现之前, 先从一个Sync的语法例子开始, 现在假设有一个HTTP请求, 这个程序会通过这个请求获取对应的响应内容, 并打印出来, 代码如下:

  • python中Task封装协程的知识点总结

    说明 1.Task是Future的子类,Task是对协程的封装,我们把多个Task放在循环调度列表中,等待调度执行. 2.Task对象可以跟踪任务和状态.Future(Task是Futrue的子类)为我们提供了异步编程中最终结果的处理(Task类还具有状态处理功能). 3.把协程封装成Task,加入一个队列等待调用.刚创建Task的时候不执行,遇到await就执行. 实例 import asyncio async def func(): print(1) await asyncio.sleep(

  • Python中gevent模块协程使用

    目录 背景 什么是协程? 什么是 gevent? 协程的例子 Q&A Q:gevent 无法捕获的耗时 A:猴子补丁 实践 异步 requests 请求 gevent 的锁 Tip 背景 因为 Python 线程的性能问题,在 Python 中使用多线程运行代码经常不能达到预期的效果.而实际开发中我们经常有高并发的需求,这就要求我们的代码在跑的更快的同时需要单位时间内执行更多的有效逻辑.减少无用的等待. 什么是协程? 我们可以认为线程是轻量级的进程,所以你也可以理解协程是轻量级的线程.协程即在一

  • python 单线程和异步协程工作方式解析

    在python3.4之后新增了asyncio模块,可以帮我们检测IO(只能是网络IO[HTTP连接就是网络IO操作]),实现应用程序级别的切换(异步IO).注意:asyncio只能发tcp级别的请求,不能发http协议. 异步IO:所谓「异步 IO」,就是你发起一个 网络IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知. 实现方式:单线程+协程实现异步IO操作. 异步协程用法 接下来让我们来了解下协程的实现,从 Python 3.4 开始,Python 中加入了协程的概

  • python简单线程和协程学习心得(分享)

    python中对线程的支持的确不够,不过据说python有足够完备的异步网络框架模块,希望日后能学习到,这里就简单的对python中的线程做个总结 threading库可用来在单独的线程中执行任意的python可调用对象.尽管此模块对线程相关操作的支持不够,但是我们还是能够用简单的线程来处理I/O操作,以减低程序响应时间. from threading import Thread import time def countdown(n): while n > 0: print('T-minus:

  • Python异步编程之协程任务的调度操作实例分析

    本文实例讲述了Python异步编程之协程任务的调度操作.分享给大家供大家参考,具体如下: 我们知道协程是异步进行的,碰到IO阻塞型操作时需要调度其他任务,那么这个调度规则或者是算法是怎样的呢?现在有以下几个疑问: 1.多个任务准备好,需要运行时,优先执行哪一个? 2.一个任务运行时,如果别的任务准备好了,是否需要中断当前任务呢? 在网上找了很多资料,也无法找到相关的资料,于是编写了几个简单的程序,查看任务的执行过程. 根据Python的asyncio我们可以编写一个简单的程序: import a

  • Android中的Coroutine协程原理解析

    前言 协程是一个并发方案.也是一种思想. 传统意义上的协程是单线程的,面对io密集型任务他的内存消耗更少,进而效率高.但是面对计算密集型的任务不如多线程并行运算效率高. 不同的语言对于协程都有不同的实现,甚至同一种语言对于不同平台的操作系统都有对应的实现. 我们kotlin语言的协程是 coroutines for jvm的实现方式.底层原理也是利用java 线程. 基础知识 生态架构 相关依赖库 dependencies { // Kotlin implementation "org.jetb

  • 深入浅析Python 中 is 语法带来的误解

    起步 Python 的成功一个原因是它的可读性,代码清晰易懂,更容易被人类所理解,但有时可读性会产生误解. 假如要判断一个变量是不是 17,那可以: if x is 17: x 是 17 肯定是比 x == 17 更加口语化的. is的误解 但是如果你尝试: if name is "weapon": 这个判断不见得管用.is 用来检查左侧和右侧是否是完全相同的对象.如果有两个不同的字符串对象,每个对象的值是相同的,应该使用 == 来判断,因为 is 的用法与口语上的区别挺大的: if

  • 基于python生成器封装的协程类

    自从python2.2提供了yield关键字之后,python的生成器的很大一部分用途就是可以用来构建协同程序,能够将函数挂起返回中间值并能从上次离开的地方继续执行.python2.5的时候,这种生成器更加接近完全的协程,因为提供了将值和异常传递回到一个继续执行的函数中,当等待生成器的时候,生成器能返回控制. python提供的生成器设施: yield:能够将自己挂起,并提供一个返回值给等待方 send:唤起一个被挂起的生成器,并能够传递一个参数,可以在生成器中抛出异常 next:本质上相当于s

  • Python全栈之协程详解

    目录 1. 线程队列 2. 进程池_线程池 3. 回调函数 4. 协程 总结: 1. 线程队列 # ### 线程队列 from queue import Queue """ put 存放 超出队列长度阻塞 get 获取 超出队列长度阻塞 put_nowait 存放,超出队列长度报错 get_nowait 获取,超出队列长度报错 """ # (1) Queue """先进先出,后进先出"""

随机推荐