如何用python实现一个HTTP连接池

一. 连接池的原理

  首先, HTTP连接是基于TCP连接的, 与服务器之间进行HTTP通信, 本质就是与服务器之间建立了TCP连接后, 相互收发基于HTTP协议的数据包. 因此, 如果我们需要频繁地去请求某个服务器的资源, 我们就可以一直维持与个服务器的TCP连接不断开, 然后在需要请求资源的时候, 把连接拿出来用就行了.

  一个项目可能需要与服务器之间同时保持多个连接, 比如一个爬虫项目, 有的线程需要请求服务器的网页资源, 有的线程需要请求服务器的图片等资源, 而这些请求都可以建立在同一条TCP连接上.

  因此, 我们使用一个管理器来对这些连接进行管理, 任何程序需要使用这些连接时, 向管理器申请就可以了, 等到用完之后再将连接返回给管理器, 以供其他程序重复使用, 这个管理器就是连接池.

二. 实现代码

1. HTTPConnectionPool类

  基于上一章的分析, 连接池应该是一个收纳连接的容器, 同时对这些连接有管理能力:

class HTTPConnectionPool:

  def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None:
    """
    :param host: pass
    :param port: pass
    :param max_size: 同时存在的最大连接数, 默认None->连接数无限,没了就创建
    :param idle_timeout: 单个连接单次最长空闲时间,超时自动关闭,默认None->不限时
    """
    self.host = host
    self.port = port
    self.max_size = max_size
    self.idle_timeout = idle_timeout
    self._lock = threading.Condition()
    self._pool = []
    # 这里的conn_num指的是总连接数,包括其它线程拿出去正在使用的连接
    self.conn_num = 0
    self.is_closed = False

  def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
    ...

  def release(self, conn: WrapperHTTPConnection) -> None:
    ...

  因此, 我们定义这样一个HTTPConnectionPool类, 使用一个列表来保存可用的连接. 对于外部来说, 只需要调用这个连接池对象的acquire和release方法就能取得和释放连接.

2. 线程安全地管理连接

  对于线程池内部来说, 至少需要三个关于连接的操作: 从连接池中取得连接, 将连接放回连接池, 以及创建一个连接:

def _get_connection(self) -> WrapperHTTPConnection:
  # 这个方法会把连接从_idle_conn移动到_used_conn列表中,并返回这个连接
  try:
    return self._pool.pop()
  except IndexError:
    raise EmptyPoolError

def _put_connection(self, conn: WrapperHTTPConnection) -> None:
  self._pool.append(conn)

def _create_connection(self) -> WrapperHTTPConnection:
  self.conn_num += 1
  return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))

  对于连接池外部来说, 主要有申请连接和释放连接这两个操作, 实际上这就是个简单的生产者消费者模型. 考虑到外部可能是多线程的环境, 我们使用threading.Condition来保证线程安全. 关于Condition的资料可以看这里.

def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
  if self.is_closed:
    raise ConnectionPoolClosed
  with self._lock:
    if self.max_size is None or not self.is_full():
      # 在还能创建新连接的情况下,如果没有空闲连接,直接创建一个就行了
      if self.is_pool_empty():
        self._put_connection(self._create_connection())
    else:
      # 不能创建新连接的情况下,如果设置了blocking=False,没连接就报错
      # 否则,就基于timeout进行阻塞,直到超时或者有可用连接为止
      if not blocking:
        if self.is_pool_empty():
          raise EmptyPoolError
      elif timeout is None:
        while self.is_pool_empty():
          self._lock.wait()
      elif timeout < 0:
        raise ValueError("'timeout' must be a non-negative number")
      else:
        end_time = time.time() + timeout
        while self.is_pool_empty():
          remaining = end_time - time.time()
          if remaining <= 0:
            raise EmptyPoolError
          self._lock.wait(remaining)
    # 走到这一步了,池子里一定有空闲连接
    return self._get_connection()

def release(self, conn: WrapperHTTPConnection) -> None:
  if self.is_closed:
    # 如果这个连接是在连接池关闭后才释放的,那就不用回连接池了,直接放生
    conn.close()
    return
  # 实际上,python列表的append操作是线程安全的,可以不加锁
  # 这里调用锁是为了通过notify方法通知其它正在wait的线程:现在有连接可用了
  with self._lock:
    if not conn.is_available:
      # 如果这个连接不可用了,就应该创建一个新连接放进去,因为可能还有其它线程在等着连接用       conn.close()       self.conn_num -= 1
      conn = self._create_connection()
    self._put_connection(conn)
    self._lock.notify()

  我们首先看看acquire方法, 这个方法其实就是在申请到锁之后调用内部的_get_connection方法获取连接, 这样就线程安全了. 需要注意的是, 如果当前的条件无法获取连接, 就会调用条件变量的wait方法, 及时释放锁并阻塞住当前线程. 然后, 当其它线程作为生产者调用release方法释放连接时, 会触发条件变量的notify方法, 从而唤醒一个阻塞在wait阶段的线程, 即消费者. 这个消费者再从池中取出刚放回去的线程, 这样整个生产者消费者模型就运转起来了.

3. 上下文管理器

  对于一个程序来说, 它使用连接池的形式是获取连接->使用连接->释放连接. 因此, 我们应该通过with语句来管理这个连接, 以免在程序的最后遗漏掉释放连接这一步骤.

  基于这个原因, 我们通过一个WrapperHTTPConnection类来对HTTPConnection进行封装, 以实现上下文管理器的功能. HTTPConnection的代码可以看《用python实现一个HTTP客户端》这篇文章.

class WrapperHTTPConnection:

  def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
    self.pool = pool
    self.conn = conn
    self.response = None
    self.is_available = True

  def __enter__(self) -> 'WrapperHTTPConnection':
    return self

  def __exit__(self, *exit_info: Any) -> None:
    # 如果response没读完并且连接需要复用,就弃用这个连接
    if not self.response.will_close and not self.response.is_closed():
      self.close()
    self.pool.release(self)

  def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
    self.conn.request(*args, **kwargs)
    self.response = self.conn.get_response()
    return self.response

  def close(self) -> None:
    self.conn.close()
    self.is_available = False

  同样的, 连接池可能也需要关闭, 因此我们给连接池也加上上下文管理器的功能:

class HTTPConnectionPool:
  ...

  def close(self) -> None:
    if self.is_closed:
      return
    self.is_closed = True
    pool, self._pool = self._pool, None
    for conn in pool:
      conn.close()

  def __enter__(self) -> 'HTTPConnectionPool':
    return self

  def __exit__(self, *exit_info: Any) -> None:
    self.close()

  这样, 我们就可以通过with语句优雅地管理连接池了:

with HTTPConnectionPool(**kwargs) as pool:
  with pool.acquire() as conn:
    res = conn.request('GET', '/')
    ...

4. 定时清理连接

  如果一个连接池的所需连接数是随时间变化的, 那么就会出现一种情况: 在高峰期, 我们创建了非常多的连接, 然后进入低谷期之后, 连接过剩, 大量的连接处于空闲状态, 浪费资源. 因此, 我们可以设置一个定时任务, 定期清理空闲时间过长的连接, 减少连接池的资源占用.

  首先, 我们需要为连接对象添加一个last_time属性, 每当连接释放进入连接池后, 就修改这个属性的值为当前时间, 这样我们就能明确知道, 连接池内的每个空闲连接空闲了多久:

class WrapperHTTPConnection:
  ...

  def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
    ...
    self.last_time = None

class HTTPConnectionPool:
  ...

  def _put_connection(self, conn: WrapperHTTPConnection) -> None:
    conn.last_time = time.time()
    self._pool.append(conn)

  然后, 我们通过threading.Timer来实现一个定时任务:

def start_clear_conn(self) -> None:
  if self.idle_timeout is None:
    # 如果空闲连接的超时时间为无限,那么就不应该清理连接
    return
  self.clear_idle_conn()
  self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
  self._clearer.start()

def stop_clear_conn(self) -> None:
  if self._clearer is not None:
    self._clearer.cancel()

  threading.Timer只会执行一次定时任务, 因此, 我们需要在start_clear_conn中不断地把自己设置为定时任务. 这其实等同于新开了一个线程来执行start_clear_conn方法, 因此并不会出现递归过深问题. 不过需要注意的是, threading.Timer虽然不会阻塞当前线程, 但是却会阻止当前线程结束, 就算把它设置为守护线程都不行, 唯一可行的办法就是调用stop_clear_conn方法取消这个定时任务.

  最后, 我们定义clear_idle_conn方法来清理闲置时间超时的连接:

def clear_idle_conn(self) -> None:
  if self.is_closed:
    raise ConnectionPoolClosed
  # 这里开一个新线程来清理空闲连接,避免了阻塞主线程导致的定时精度出错
  threading.Thread(target=self._clear_idle_conn).start()

def _clear_idle_conn(self) -> None:
  if not self._lock.acquire(timeout=self.idle_timeout):
    # 因为是每隔self.idle_timeout秒检查一次
    # 如果过了self.idle_timeout秒还没申请到锁,下一次都开始了,本次也就不用继续了
    return
  current_time = time.time()
  if self.is_pool_empty():
    pass
  elif current_time - self._pool[-1].last_time >= self.idle_timeout:
    # 这里处理下面的二分法没法处理的边界情况,即所有连接都闲置超时的情况
    self.conn_num -= len(self._pool)
    self._pool.clear()
  else:
    # 通过二分法找出从左往右第一个不超时的连接的指针
    left, right = 0, len(self._pool) - 1
    while left < right:
      mid = (left + right) // 2
      if current_time - self._pool[mid].last_time >= self.idle_timeout:
        left = mid + 1
      else:
        right = mid
    self._pool = self._pool[left:]
    self.conn_num -= left
  self._lock.release()

  由于我们获取和释放连接都是从self._pool的尾部开始操作的, 因此self._pool这个容器是一个先进后出队列, 它里面放着的连接, 一定是越靠近头部的闲置时间越长, 从头到尾闲置时间依次递减. 基于这个原因, 我们使用二分法来找出列表中第一个没有闲置超时的连接, 然后把在它之前的连接一次性删除, 这样就能达到O(logN)的时间复杂度, 算是一种比较高效的方法. 需要注意的是, 如果连接池内所有的连接都是超时的, 那么这种方法是删不干净的, 需要对这种边界情况单独处理.

三. 总结

1. 完整代码及分析

  这个连接池的完整代码如下:

import threading
import time
from typing import Any

from client import HTTPConnection, HTTPResponse

class WrapperHTTPConnection:

  def __init__(self, pool: 'HTTPConnectionPool', conn: HTTPConnection) -> None:
    self.pool = pool
    self.conn = conn
    self.response = None
    self.last_time = time.time()
    self.is_available = True

  def __enter__(self) -> 'WrapperHTTPConnection':
    return self

  def __exit__(self, *exit_info: Any) -> None:
    # 如果response没读完并且连接需要复用,就弃用这个连接
    if not self.response.will_close and not self.response.is_closed():
      self.close()
    self.pool.release(self)

  def request(self, *args: Any, **kwargs: Any) -> HTTPResponse:
    self.conn.request(*args, **kwargs)
    self.response = self.conn.get_response()
    return self.response

  def close(self) -> None:
    self.conn.close()
    self.is_available = False

class HTTPConnectionPool:

  def __init__(self, host: str, port: int = None, max_size: int = None, idle_timeout: int = None) -> None:
    """
    :param host: pass
    :param port: pass
    :param max_size: 同时存在的最大连接数, 默认None->连接数无限,没了就创建
    :param idle_timeout: 单个连接单次最长空闲时间,超时自动关闭,默认None->不限时
    """
    self.host = host
    self.port = port
    self.max_size = max_size
    self.idle_timeout = idle_timeout
    self._lock = threading.Condition()
    self._pool = []
    # 这里的conn_num指的是总连接数,包括其它线程拿出去正在使用的连接
    self.conn_num = 0
    self.is_closed = False
    self._clearer = None
    self.start_clear_conn()

  def acquire(self, blocking: bool = True, timeout: int = None) -> WrapperHTTPConnection:
    if self.is_closed:
      raise ConnectionPoolClosed
    with self._lock:
      if self.max_size is None or not self.is_full():
        # 在还能创建新连接的情况下,如果没有空闲连接,直接创建一个就行了
        if self.is_pool_empty():
          self._put_connection(self._create_connection())
      else:
        # 不能创建新连接的情况下,如果设置了blocking=False,没连接就报错
        # 否则,就基于timeout进行阻塞,直到超时或者有可用连接为止
        if not blocking:
          if self.is_pool_empty():
            raise EmptyPoolError
        elif timeout is None:
          while self.is_pool_empty():
            self._lock.wait()
        elif timeout < 0:
          raise ValueError("'timeout' must be a non-negative number")
        else:
          end_time = time.time() + timeout
          while self.is_pool_empty():
            remaining = end_time - time.time()
            if remaining <= 0:
              raise EmptyPoolError
            self._lock.wait(remaining)
      # 走到这一步了,池子里一定有空闲连接
      return self._get_connection()

  def release(self, conn: WrapperHTTPConnection) -> None:
    if self.is_closed:
      # 如果这个连接是在连接池关闭后才释放的,那就不用回连接池了,直接放生
      conn.close()
      return
    # 实际上,python列表的append操作是线程安全的,可以不加锁
    # 这里调用锁是为了通过notify方法通知其它正在wait的线程:现在有连接可用了
    with self._lock:
      if not conn.is_available:
        # 如果这个连接不可用了,就应该创建一个新连接放进去,因为可能还有其它线程在等着连接用
        conn.close()
        self.conn_num -= 1
        conn = self._create_connection()
      self._put_connection(conn)
      self._lock.notify()

  def _get_connection(self) -> WrapperHTTPConnection:
    # 这个方法会把连接从_idle_conn移动到_used_conn列表中,并返回这个连接
    try:
      return self._pool.pop()
    except IndexError:
      raise EmptyPoolError

  def _put_connection(self, conn: WrapperHTTPConnection) -> None:
    conn.last_time = time.time()
    self._pool.append(conn)

  def _create_connection(self) -> WrapperHTTPConnection:
    self.conn_num += 1
    return WrapperHTTPConnection(self, HTTPConnection(self.host, self.port))

  def is_pool_empty(self) -> bool:
    # 这里指的是,空闲可用的连接是否为空
    return len(self._pool) == 0

  def is_full(self) -> bool:
    if self.max_size is None:
      return False
    return self.conn_num >= self.max_size

  def close(self) -> None:
    if self.is_closed:
      return
    self.is_closed = True
    self.stop_clear_conn()
    pool, self._pool = self._pool, None
    for conn in pool:
      conn.close()

  def clear_idle_conn(self) -> None:
    if self.is_closed:
      raise ConnectionPoolClosed
    # 这里开一个新线程来清理空闲连接,避免了阻塞主线程导致的定时精度出错
    threading.Thread(target=self._clear_idle_conn).start()

  def _clear_idle_conn(self) -> None:
    if not self._lock.acquire(timeout=self.idle_timeout):
      # 因为是每隔self.idle_timeout秒检查一次
      # 如果过了self.idle_timeout秒还没申请到锁,下一次都开始了,本次也就不用继续了
      return
    current_time = time.time()
    if self.is_pool_empty():
      pass
    elif current_time - self._pool[-1].last_time >= self.idle_timeout:
      # 这里处理下面的二分法没法处理的边界情况,即所有连接都闲置超时的情况
      self.conn_num -= len(self._pool)
      self._pool.clear()
    else:
      # 通过二分法找出从左往右第一个不超时的连接的指针
      left, right = 0, len(self._pool) - 1
      while left < right:
        mid = (left + right) // 2
        if current_time - self._pool[mid].last_time >= self.idle_timeout:
          left = mid + 1
        else:
          right = mid
      self._pool = self._pool[left:]
      self.conn_num -= left
    self._lock.release()

  def start_clear_conn(self) -> None:
    if self.idle_timeout is None:
      # 如果空闲连接的超时时间为无限,那么就不应该清理连接
      return
    self.clear_idle_conn()
    self._clearer = threading.Timer(self.idle_timeout, self.start_clear_conn)
    self._clearer.start()

  def stop_clear_conn(self) -> None:
    if self._clearer is not None:
      self._clearer.cancel()

  def __enter__(self) -> 'HTTPConnectionPool':
    return self

  def __exit__(self, *exit_info: Any) -> None:
    self.close()

class EmptyPoolError(Exception):
  pass

class ConnectionPoolClosed(Exception):
  pass

  首先, 这个连接池的核心就是对连接进行管理, 而这包含取出连接和释放连接两个过程. 因此这东西的本质就是一个生产者消费者模型, 取出线程时是消费者, 放入线程时是生产者, 使用threading自带的Condition对象就能完美解决线程安全问题, 使二者协同合作.

  解决获取连接和释放连接这个问题之后, 其实这个连接池就已经能用了. 但是如果涉及到更多细节方面的东西, 比如判断连接是否可用, 自动释放连接, 清理闲置连接等等, 就需要对这个连接进行封装, 为它添加更多的属性和方法, 这就引入了WrapperHTTPConnection这个类. 实现它的__enter___和__exit__方法之后, 就能使用上下文管理器来自动释放连接. 至于清理闲置连接, 通过last_time属性记录每个连接的最后释放时间, 然后在连接池中添加一个定时任务就行了.

以上就是如何用python实现一个HTTP连接池的详细内容,更多关于python 实现一个HTTP连接池的资料请关注我们其它相关文章!

(0)

相关推荐

  • 如何基于Python + requests实现发送HTTP请求

    这篇文章主要介绍了如何基于Python + requests实现发送HTTP请求,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下 一.在接口自动化测试过程中,存在两种情况: 一种是不需要鉴权的接口,可以直接访问的. 还有一种情况是需要鉴权才可以访问的接口. 这里我们通过Python + requests 实现这两种发送请求的方法 """ ============================ author:Treasure丶

  • 详解用python -m http.server搭一个简易的本地局域网

    工作时同事间几mb小文件的传输,一般使用QQ或者微信就足够了,但当传输文件几百MB或者几十G时,这种方法的效率就显得不足了.本篇就是简单说明一个python小功能,让大家能利用python方便的搭建一个本地局域网.跟同事测试时,速度轻松达到800mb/s. 搭建只需三步就可以: 1.设置python路径为环境变量 2.命令行输入python -m http.server 8888来搭建局域网 3.使用本机的ip地址进行访问 接下来我们一步一步看: 设置python路径为环境变量 1.先找到自己p

  • 解决Python发送Http请求时,中文乱码的问题

    解决方法: 先encode再quote. 原理: msg.encode('utf-8')是解决中文乱码问题. quote():假如URL的 name 或者 value 值中有『&』.『%』或者『=』等符号,就会有问题.所以URL中的参数字符串也需要把『&=』等符号进行编码,quote()就是对参数字符串中的『&=%』等符号进行编码. 例子: # -*- coding: UTF-8 -*- # python2.7 from urllib import quote import req

  • python3从网络摄像机解析mjpeg http流的示例

    前言 网络摄像头的视频流解析直接使用通过http的Mjpeg是具有边界帧信息的multipart / x-mixed-replace,而jpeg数据只是以二进制形式发送.因此,实际上不需要关心HTTP协议标头.所有jpeg帧均以marker开头,0xff 0xd8并以结尾0xff 0xd9.因此,上面的代码从http流中提取了此类帧,并将其一一解码.像下面 ...(http) 0xff 0xd8 --| [jpeg data] |--this part is extracted and deco

  • Python实现http接口自动化测试的示例代码

    网上http接口自动化测试Python实现有很多,我也是在慕课网上学习了相关课程,并实际操作了一遍,于是进行一些总结,便于以后回顾温习,有许多不完善的地方,希望大神们多多指教! 接口测试常用的工具有fiddler,postman,jmeter等,使用这些工具测试时,需要了解常用的接口类型和区别,比如我用到的post和get请求,表面上看get用于获取数据post用于修改数据,两者传递参数的方式也有不一样,get是直接在url里通过?来连接参数,而post则是把数据放在HTTP的包体内(reque

  • 基于Python模拟浏览器发送http请求

    1.使用 urllib2 实现 #! /usr/bin/env python # -*- coding=utf-8 -*- import urllib2 url="https://www.baidu.com" req_header = {"User-Agent":"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/53

  • Python Http请求json解析库用法解析

    httpparser介绍 :1.解析字节类型的http与https请求数据 :2.支持已k-v形式修改请求数据 :3.支持重新编码请求数据 源码 import json __author = "-ling" def parser(request_data): # 获取请求的三个段: # 1.请求方法 URI协议 版本 # 2.请求头(Request Header) # 3.请求正文 index0 = request_data.find(b"\r\n\r\n") re

  • Python如何实现自带HTTP文件传输服务

    一行命令搭建一个基于python的http文件传输服务 由于今天朋友想要一个文件,而我恰好有,因为这个文件比较大,网速不是很给力,所以想到了python自己有这么一个功能,这样不仅不需要下载其他软件,下载速度也是噌噌的 2333333 python -m http.server -b 127.0.0.1 (-b 绑定ip,不指定的话默认是本机ip) 这个是python自带的一个功能,这个功能便于我们传输一些文件,当服务开启后,我们就可以通过浏览器看到当前路径下的所有文件及文件夹(这里的路径是cm

  • Python requests HTTP验证登录实现流程

    1.场景 1)用户输入完网址后,浏览器直接弹出需要输入用户名/密码 PS:此时输入用户名密码即可登录,或者直接带着用户名密码访问网站. 假设url为http://xxx.yyy.zzz 用户名为admin 密码为123456 则访问的网址应该为http://admin:123456@xxx.yyy.zzz[http://username:password@url] 直接访问改网址即可 2)利用requests.get(url)返回状态码为401 # -*- encoding=utf-8 -*-

  • Python3自定义http/https请求拦截mitmproxy脚本实例

    脚本内容 代码如下: from mitmproxy import http, ctx from multiprocessing import Lock class Filter: def __init__(self, filter_info): self.log_info = "" self.mutex = Lock() self.filter_info = filter_info self.response_file = None self.switch_on = False sel

  • Python HTTP下载文件并显示下载进度条功能的实现

    下面的Python脚本中利用request下载文件并写入到文件系统,利用progressbar模块显示下载进度条. 其中利用request模块下载文件可以直接下载,不需要使用open方法,例如: import urllib import requests.packages.urllib3 requests.packages.urllib3.disable_warnings() url = "https://raw.githubusercontent.com/racaljk/hosts/maste

  • 解决python打开https出现certificate verify failed的问题

    今天遇到一个奇怪的问题,在用urllib打开一个https链接的时候,出现了一下报错信息:IOError: [Errno socket error] [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:727),报错问题就是证书验证失败,这种情况出现在网站使用的是自签名证书或系统根证书存在问题的时候. 原因: Python 从 2.7.9版本开始,就默认开启了服务器证书验证功能,如果证书校验不通过,则拒绝后续操作:

随机推荐