python json-rpc 规范源码阅读

目录
  • json-rpc 源码阅读
    • JSON-RPC规范
    • jsonrpcclient的实现
    • jsonrpcserver的实现
  • 小结
    • 小技巧

json-rpc 源码阅读

JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议。JSON-RPC应用很广泛,比如以太坊的API。JSON-RPC的python实现较多,我选择了Exploding Labs 提供的python版本。主要是其它库都比较古老,而e-labs的实现采用最新版本python,支持类型系统,还有一些函数式编程的范式,代码也很简洁,值得学习。

e-labs的JSON-RPC分成客户端和服务端两个库,分别是jsonrpcclient和jsonrpcserver, 代码版本如下表:

名称 版本
jsonrpcclient 4.0.2
jsonrpcserver 5.0.9

准备好代码后,我们可以开始json-rpc的源码阅读,本文包括下面几个部分:

  • JSON-RPC规范
  • jsonrpcclient的实现
  • jsonrpcserver的实现
  • 小结
  • 小技巧

JSON-RPC规范

JSON-RPC规范,我这里借用jsonrpcserver中的验证规则文件简单介绍一下,文件如下:

# request-schema.json
{
    "$schema": "http://json-schema.org/draft-04/schema#",
    "description": "A JSON RPC 2.0 request",
    "oneOf": [
        {
            "description": "An individual request",
            "$ref": "#/definitions/request"
        },
        {
            "description": "An array of requests",
            "type": "array",
            "items": { "$ref": "#/definitions/request" },
            "minItems": 1
        }
    ],
    "definitions": {
        "request": {
            "type": "object",
            "required": [ "jsonrpc", "method" ],
            "properties": {
                "jsonrpc": { "enum": [ "2.0" ] },
                "method": {
                    "type": "string"
                },
                "id": {
                    "type": [ "string", "number", "null" ],
                    "note": [
                        "While allowed, null should be avoided: http://www.jsonrpc.org/specification#id1",
                        "While allowed, a number with a fractional part should be avoided: http://www.jsonrpc.org/specification#id2"
                    ]
                },
                "params": {
                    "type": [ "array", "object" ]
                }
            },
            "additionalProperties": false
        }
    }
}

文件描述了JSON-RPC的规则,如下:

  • json-rpc请求可以是单个的request对象,也是是批量的request对象数组
  • 每个request对象需要符合:
    • 必填字段jsonrpc,值枚举类型。目前2.0,其实就是版本号。(之前有1.0版本)
    • 必填字段method, 字符串类型。远程函数的名称。
    • id字段,支持字符串,数字或者空。为空表示通知无需回应(result)。id确保响应可以一一对应到请求上。
    • params字段,支持数组或者字典。

JSON-RPC响应部分的规则是:

  • jsonrpc字段,值为2.0
  • result字段,值为调用结果
  • error字段,值为异常信息,包括code,message和data三个字段,规范定义了详细的错误清单。
  • id同请求的id
  • result和error二选一

强烈建议大家阅读参考链接中的规范原文,介绍的非常清晰,中文翻译也很到位,有助于对JSON-RPC规范完全理解。

jsonrpcclient的实现

模块文件 功能描述
id_generators.py id生成器
requests.py 请求信息封装
response.py 响应信息封装
sentinels.py 定义NOID,用于通知类请求
utils.py 一些工具函数
examples 一些示例

从示例可以知道JSON-RPC,可以使用不同的底层协议比如http,websocket和tcp(zeromq实现)等。我们看最简单的基于http实现的实例:

from jsonrpcclient import request, parse, Ok
import logging
import requests
response = requests.post("http://localhost:5000/", json=request("ping"))
parsed = parse(response.json())
if isinstance(parsed, Ok):
    print(parsed.result)
else:
    logging.error(parsed.message)

这段api展示了:

  • jsonrpcclient只是封装请求request和响应Ok,数据请求的发送由不同协议提供,这里使用requests,另外还有aiohttp的实现等。
  • resquest函数封装请求,parse解析响应
  • 正常的结果展示result信息,错误的结果展示message信息

request代码很简单, 封装请求成符合JSON-RPC规范的字符串:

# requests.py
def request_pure(
    id_generator: Iterator[Any],
    method: str,
    params: Union[Dict[str, Any], Tuple[Any, ...]],
    id: Any,
) -> Dict[str, Any]:
    return {
        "jsonrpc": "2.0",
        "method": method,
        **(
            {"params": list(params) if isinstance(params, tuple) else params}
            if params
            else {}
        ),
        "id": id if id is not NOID else next(id_generator),
    }
def request_impure(
    id_generator: Iterator[Any],
    method: str,
    params: Union[Dict[str, Any], Tuple[Any, ...], None] = None,
    id: Any = NOID,
) -> Dict[str, Any]:
    return request_pure(
        id_generator or id_generators.decimal(), method, params or (), id
    )
request_natural = partial(request_impure, id_generators.decimal())
...
request = request_natural

所以示例中的请求,可以等价下面的curl命令:

$ curl -X POST http://localhost:5001 -d '{"jsonrpc": "2.0", "method": "ping", "params": {}, "id": 1}'

response处理也很简单:

# response.py
class Ok(NamedTuple):
    result: Any
    id: Any
    def __repr__(self) -> str:
        return f"Ok(result={self.result!r}, id={self.id!r})"
class Error(NamedTuple):
    code: int
    message: str
    data: Any
    id: Any
    def __repr__(self) -> str:
        return f"Error(code={self.code!r}, message={self.message!r}, data={self.data!r}, id={self.id!r})"
Response = Union[Ok, Error]

定义Response类型,是Ok或者Error。Ok和Error是两个可命名元祖。

parse就是将结果json字典解析成对应的Response:

def to_result(response: Dict[str, Any]) -> Response:
    return (
        Ok(response["result"], response["id"])
        if "result" in response
        else Error(
            response["error"]["code"],
            response["error"]["message"],
            response["error"].get("data"),
            response["id"],
        )
    )
def parse(response: Deserialized) -> Union[Response, Iterable[Response]]:
    return (
        map(to_result, response) if isinstance(response, list) else to_result(response)
    )

也可以直接使用parse_json函数,从json字符串生成结果:

parse_json = compose(parse, json.loads)

这里的map,componse等都是函数式编程。在server中函数式编程使用的更多,可见作者非常喜欢函数式编程的思想

jsonrpcserver的实现

jsonrpcclient实现非常简单,jsonrpcserver的实现会略微复杂点,但是还是可以很好的理解的,我们一起继续。jsonrpcserver的主要模块如下:

模块 描述
main.py/async_main.py main文件,分别是同步和异步版本
dispatcher.py/async_dispatcher.py rpc服务的分配器实现
methods.py rpc函数的装饰器
request.py 请求处理
response.py 响应处理
result.py 结果处理
examplse 一些示例

通用,我们先从示例入手,看看api的使用。下面是flask版本:

# flask_server.py
from flask import Flask, Response, request
from jsonrpcserver import method, Result, Success, dispatch
app = Flask(__name__)
@method
def ping() -> Result:
    return Success("pong")
@app.route("/", methods=["POST"])
def index():
    return Response(
        dispatch(request.get_data().decode()), content_type="application/json"
    )
if __name__ == "__main__":
    app.run()

从示例我们可以知道,rpc服务其实就2大步骤:

  • 使用method装饰ping函数,使它支持rpc调用,ping函数返回的是一个特点的Result数据结构
  • 所有rpc调用的http-url都是根目录,服务使用dispatch调度rpc请求

先看第一步rpc装饰器:

# methods.py
Method = Callable[..., Result]
Methods = Dict[str, Method]
global_methods = dict()
def method(
    f: Optional[Method] = None, name: Optional[str] = None
) -> Callable[..., Any]:
    """A decorator to add a function into jsonrpcserver's internal global_methods dict.
    The global_methods dict will be used by default unless a methods argument is passed
    to `dispatch`.
    Functions can be renamed by passing a name argument:
        @method(name=bar)
        def foo():
            ...
    """
    def decorator(func: Method) -> Method:
        nonlocal name
        global_methods[name or func.__name__] = func
        return func
    return decorator(f) if callable(f) else cast(Method, decorator)
  • 将所有的rpc函数都封装到global_methods字典中
  • 函数需要返回Result类型

第2步中,main模块提供了dispatch的api,主要就是下面的函数:

# main.py
def dispatch_to_response(
    request: str,
    methods: Optional[Methods] = None,
    *,
    context: Any = NOCONTEXT,
    deserializer: Callable[[str], Deserialized] = json.loads,
    validator: Callable[[Deserialized], Deserialized] = default_validator,
    post_process: Callable[[Response], Any] = identity,
) -> Union[Response, List[Response], None]:
    """Takes a JSON-RPC request string and dispatches it to method(s), giving Response
    namedtuple(s) or None.
    This is a public wrapper around dispatch_to_response_pure, adding globals and
    default values to be nicer for end users.
    Args:
        request: The JSON-RPC request string.
        methods: Dictionary of methods that can be called - mapping of function names to
            functions. If not passed, uses the internal global_methods dict which is
            populated with the @method decorator.
        context: If given, will be passed as the first argument to methods.
        deserializer: Function that deserializes the request string.
        validator: Function that validates the JSON-RPC request. The function should
            raise an exception if the request is invalid. To disable validation, pass
            lambda _: None.
        post_process: Function that will be applied to Responses.
    Returns:
        A Response, list of Responses or None.
    Examples:
       >>> dispatch('{"jsonrpc": "2.0", "method": "ping", "id": 1}')
       '{"jsonrpc": "2.0", "result": "pong", "id": 1}'
    """
    return dispatch_to_response_pure(
        deserializer=deserializer,
        validator=validator,
        post_process=post_process,
        context=context,
        methods=global_methods if methods is None else methods,
        request=request,
    )
  • request 请求的函数名称
  • methods 可供调用的函数集合,默认就是之前rpc装饰器中存储的global_methods
  • deserializer 请求的反序列化函数,validator请求验证器
  • post_process响应处理函数

post_process主要就是根据结果类型,分别取不同的字段并序列化:

def to_serializable_one(response: ResponseType) -> Union[Deserialized, None]:
    return (
        serialize_error(response._error)
        if isinstance(response, Left)
        else serialize_success(response._value)
    )

dispatch的实现,主要是下面2个函数dispatch_request和call,前者查找rpc函数,后者执行rpc函数。dispatch_request内容如下:

def dispatch_request(
    methods: Methods, context: Any, request: Request
) -> Tuple[Request, Result]:
    """Get the method, validates the arguments and calls the method.
    Returns: A tuple containing the Result of the method, along with the original
        Request. We need the ids from the original request to remove notifications
        before responding, and  create a Response.
    """
    return (
        request,
        get_method(methods, request.method)
        .bind(partial(validate_args, request, context))
        .bind(partial(call, request, context)),
    )

这里使用了oslash这个函数式编程库,我们可以简单的使用unix的管道思想去理解:

  • 使用get_method查找rpc响应函数
  • 使用validate_args验证rpc请求
  • 使用call执行rpc调用
  • 3个步骤依次执行,前者的返回值会作为后缀的参数

重中之重是call函数,原理非常简单:

def call(request: Request, context: Any, method: Method) -> Result:
    """Call the method.
    Handles any exceptions raised in the method, being sure to return an Error response.
    Returns: A Result.
    """
    try:
        result = method(*extract_args(request, context), **extract_kwargs(request))
        # validate_result raises AssertionError if the return value is not a valid
        # Result, which should respond with Internal Error because its a problem in the
        # method.
        validate_result(result)
    # Raising JsonRpcError inside the method is an alternative way of returning an error
    # response.
    except JsonRpcError as exc:
        return Left(ErrorResult(code=exc.code, message=exc.message, data=exc.data))
    # Any other uncaught exception inside method - internal error.
    except Exception as exc:
        logger.exception(exc)
        return Left(InternalErrorResult(str(exc)))
    return result
  • 使用args和kwargs动态执行rpc函数,并将结果进行返回
  • 捕获异常,返回标准错误

这里的Left是函数式编程中的概念,我们可以从response的实现,简单了解一下:

# response.py
class SuccessResult(NamedTuple):
    result: Any = None
class ErrorResult(NamedTuple):
    code: int
    message: str
    data: Any = NODATA  # The spec says this value may be omitted
# Union of the two valid result types
Result = Either[ErrorResult, SuccessResult]
def Success(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]:
    return Right(SuccessResult(*args, **kwargs))
def Error(*args: Any, **kwargs: Any) -> Either[ErrorResult, SuccessResult]:
    return Left(ErrorResult(*args, **kwargs))

SuccessResult和ErrorResult是python的两个标准对象;Result是oslash中定义的联合对象,在ErrorResult, SuccessResult中二选一,有些类似rust中的Option;Right封装了正确的结果,Left封装了错误的结果。

这一部分需要一些函数式编程的基础,如果不太理解,推荐阅读参考链接。

小结

我们一起学习了JSON-RPC规范,并且了解了Exploding Labs如何使用 现代python 实现该规范,也接触了一些函数式编程的方式。

小技巧

业务有时候需要自己实现一个简单的自增id,我们也许会用全局变量来做:

start = 0
def gen1():
    start +=1
    return count
# 调用
id = gen1()

全局变量会形成一些污染,利用闭包的特性,我们可以优化成这样:

def gen2():
    start = 0
    def incr():
        start +=1
        return count
    return incr
gen = gen2()
# 调用
id = gen()

json-rpc里提供了使用yeild关键字实现的版本:

def hexadecimal(start: int = 1) -> Iterator[str]:
    """
    Incremental hexadecimal numbers.
    e.g. 1, 2, 3, .. 9, a, b, etc.
    Args:
        start: The first value to start with.
    """
    while True:
        yield "%x" % start
        start += 1

参考链接

www.jsonrpc.org/specificati…

ethereum.org/en/develope…

www.wallarm.com/what/what-i…

github.com/dbrattli/OS…

以上就是python json-rpc 规范源码阅读的详细内容,更多关于python json-rpc 规范的资料请关注我们其它相关文章!

(0)

相关推荐

  • 通过实例解析Python RPC实现原理及方法

    单线程同步 使用socket传输数据 使用json序列化消息体 struct将消息编码为二进制字节串,进行网络传输 消息协议 // 输入 { in: "ping", params: "ireader 0" } // 输出 { out: "pong", result: "ireader 0" } 客户端 client.py # coding: utf-8 # client.py import json import time i

  • python实现一个简单RPC框架的示例

    本文需要一点Python socket基础. 回顾RPC 客户端(Client):服务调用方. 客户端存根(Client Stub):存放服务端地址信息,将客户端的请求参数数据信息打包成网络消息,再通过网络传输发送给服务端. 服务端存根(Server Stub):接收客户端发送过来的请求消息并进行解包,然后再调用本地服务进行处理. 服务端(Server):服务的真正提供者. Network Service:底层传输,可以是 TCP 或 HTTP. 实现jsonrpc 在实现前,简单理一下整体思路

  • Python grpc超时机制代码示例

    工作中遇到一个问题,上游服务通过grpc调用下游服务,但是由于下游服务负载太高导致上游服务的调用会随机出现超时的情况,但是有一点不太明确:超时之后,下游服务还会继续进行计算么? 于是自己写了一个damon试了一下: client: # Copyright 2015 gRPC authors. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file ex

  • 执行python脚本并传入json数据格式参数方式

    目录 执行python脚本并传入json数据格式参数 python解析JSON数据 json模块包含以下两个函数 执行python脚本并传入json数据格式参数 最近在写一个python的数据统计分析脚本,需要根据json的数据格式参数去进行业务逻辑处理,出了一些情况拿出来一起分享讨论.一下代码纯属示例. 脚本类容很简单,接收一下参数,并使用json包进行一个加载解析. 执行脚本,传入一个json对象数组: 脚本接受到的内容: json加载解析出错: 可以看到python脚本接收到参数的时候会将

  • python中Requests发送json格式的post请求方法

    目录 前言 1.普通string类型 2.string内是字典的 3.元组(嵌套列表或者) 4.字典 5.json 6.传入非嵌套元组或列表 7.以post(url,json=data)请求 前言 问题: 做requests请求时遇到如下报错: {“code”:“500”,“message”:"JSON parse error: Cannot construct instance of com.bang.erpapplication.domain.User (although at least

  • Python中使用json.load()和json.loads()加载json数据的方法实例

    目录 前言 预备知识: 使用方法 总结 前言 最近在python里面用json读取json文件,可是老是不成功,特此记录一下. 预备知识: def load(fp, cls=None, object_hook=None, parse_float=None, parse_int=None, parse_constant=None, object_pairs_hook=None, **kw): """Deserialize ``fp`` (a ``.read()``-suppor

  • python json-rpc 规范源码阅读

    目录 json-rpc 源码阅读 JSON-RPC规范 jsonrpcclient的实现 jsonrpcserver的实现 小结 小技巧 json-rpc 源码阅读 JSON-RPC是一个无状态且轻量级的远程过程调用(RPC)协议.JSON-RPC应用很广泛,比如以太坊的API.JSON-RPC的python实现较多,我选择了Exploding Labs 提供的python版本.主要是其它库都比较古老,而e-labs的实现采用最新版本python,支持类型系统,还有一些函数式编程的范式,代码也很

  • Nacos源码阅读方法

    为什么我会经常阅读源码呢,因为阅读源码能让你更加接近大佬,哈哈,这是我瞎扯的. 这篇文章将会带大家阅读Nacos源码 以及 教大家阅读源码的技巧,我们正式开始吧! 先给大家献上一张我梳理的高清源码图,方便大家对nacos的源码有一个整体上的认识. 有了这张图,我们就很容易去看nacos源码了. 如何找切入点 首先我们得要找一个切入点进入到nacos源码中,那么就从nacos依赖入手 <dependency> <groupId>com.alibaba.cloud</groupI

  • Eureka源码阅读解析Server服务端启动流程实例

    目录 环境 1.spring cloud整合eureka server demo 1.1 新建spring boot项目 pom.xml文件添加 配置文件 1.2 启动类 1.3 启动 2. spring cloud自动装配eureka server源码解析 2.1 @EnableEurekaServer注解 2.2 EurekaServerAutoConfiguration 2.2.1 查找starter 自动装配类的技巧 2.2.2 EurekaServerAutoConfiguration

  • Eureka源码阅读Client启动入口注册续约及定时任务

    目录 引言 1.环境 2. Spring Cloud整合Eureka Client 启动入口 2.1 封装配置文件的类 2.1.1 EurekaClientConfigBean 2.1.2 EurekaInstanceConfigBean 2.2 EurekaClient 2.2.1 ApplicationInfoManager 2.2.2 EurekaClient 2.3 小结 3. DiscoveryClient类的解析 3.1 DiscoveryClient 作用 3.2 Discover

  • CloudStack SSVM启动条件源码阅读与问题解决方法

    CloudStack SSVM启动条件源码阅读与问题解决方法: 在CloudStack建立zone的时候,经常遇到SSVM不启动,或者根本就没有SSVM的情况,分析CloudStack日志,会发现有"Zone 1 is not ready to launch secondary storage VM yet"打印,意思是zone还未准备好启动SSVM. 通过查询CloudStack源代码,发现启动SSVM前有如下检查:         获取Zone里的template. select

  • Three.js源码阅读笔记(Object3D类)

    这是Three.js源码阅读笔记的第二篇,直接开始. Core::Object3D Object3D似乎是Three.js框架中最重要的类,相当一部分其他的类都是继承自Object3D类,比如场景类.几何形体类.相机类.光照类等等:他们都是3D空间中的对象,所以称为Object3D类.Object3D构造函数如下: 复制代码 代码如下: THREE.Object3D = function () { THREE.Object3DLibrary.push( this ); this.id = THR

  • Three.js源码阅读笔记(物体是如何组织的)

    这是Three.js源码阅读笔记第三篇.之前两篇主要是关于核心对象的,这些核心对象主要围绕着矢量vector3对象和矩阵matrix4对象展开的,关注的是空间中的单个顶点的位置和变化.这一篇将主要讨论Three.js中的物体是如何组织的:即如何将顶点.表面.材质组合成为一个具体的对象. Object::Mesh 该构造函数构造了一个空间中的物体.之所以叫"网格"是因为,实际上具有体积的物体基本都是建模成为"网格"的. 复制代码 代码如下: THREE.Mesh =

  • 源码阅读之storm操作zookeeper-cluster.clj

    storm操作zookeeper的主要函数都定义在命名空间backtype.storm.cluster中(即cluster.clj文件中). backtype.storm.cluster定义了两个重要protocol:ClusterState和StormClusterState. clojure中的protocol可以看成java中的接口,封装了一组方法.ClusterState协议中封装了一组与zookeeper进行交互的基础函数,如获取子节点函数,获取子节点数据函数等,ClusterStat

  • Java终止线程实例和stop()方法源码阅读

    了解线程 概念 线程 是程序中的执行线程.Java 虚拟机允许应用程序并发地运行多个执行线程. 线程特点 拥有状态,表示线程的状态,同一时刻中,JVM中的某个线程只有一种状态; ·NEW 尚未启动的线程(程序运行开始至今一次未启动的线程) ·RUNNABLE 可运行的线程,正在JVM中运行,但它可能在等待其他资源,如CPU. ·BLOCKED 阻塞的线程,等待某个锁允许它继续运行 ·WAITING 无限等待(再次运行依赖于让它进入该状态的线程执行某个特定操作) ·TIMED_WAITING 定时

  • jdk源码阅读Collection详解

    见过一句夸张的话,叫做"没有阅读过jdk源码的人不算学过java".从今天起开始精读源码.而适合精读的源码无非就是java.io,.util和.lang包下的类. 面试题中对于集合的考察还是比较多的,所以我就先从集合的源码开始看起. (一)首先是Collection接口. Collection是所有collection类的根接口;Collection继承了Iterable,即所有的Collection中的类都能使用foreach方法. /** * Collection是所有collec

随机推荐