基于python yield机制的异步操作同步化编程模型

本文总结下如何在编写python代码时对异步操作进行同步化模拟,从而提高代码的可读性和可扩展性。

游戏引擎一般都采用分布式框架,通过一定的策略来均衡服务器集群的资源负载,从而保证服务器运算的高并发性和CPU高利用率,最终提高游戏的性能和负载。由于引擎的逻辑层调用是非抢占式的,服务器之间都是通过异步调用来进行通讯,导致游戏逻辑无法同步执行,所以在代码层不得不人为地添加很多回调函数,使一个原本完整的功能碎片化地分布在各个回调函数中。

异步逻辑

以游戏中的副本评分逻辑为例,在副本结束时副本管理进程需要收集副本中每个玩家的战斗信息,再结合管理进程内部的统计信息最终给出一个副本评分,发放相应奖励。因为每个玩家实体都随机分布在不同进程中,所以管理进程需要通过异步调用来获取玩家身上的战斗信息。

实现代码如下所示:

# -*- coding: gbk -*-
import random

# 玩家实体类
class Player(object):
  def __init__(self, entityId):
    super(Player, self).__init__()
    # 玩家标识
    self.entityId = entityId

  def onFubenEnd(self, mailBox):
    score = random.randint(1, 10)
    print "onFubenEnd player %d score %d"%(self.entityId, score)

    # 向副本管理进程发送自己的id和战斗信息
    mailBox.onEvalFubenScore(self.entityId, score)

# 副本管理类
class FubenStub(object):
  def __init__(self, players):
    super(FubenStub, self).__init__()
    self.players = players

  def evalFubenScore(self):
    self.playerRelayCnt = 0
    self.totalScore = 0

    # 通知每个注册的玩家,副本已经结束,索取战斗信息
    for player in self.players:
      player.onFubenEnd(self)

  def onEvalFubenScore(self, entityId, score):
    # 收到其中一个玩家的战斗信息
    print "onEvalFubenScore player %d score %d"%(entityId, score)
    self.playerRelayCnt += 1
    self.totalScore += score

    # 当收集完所有玩家的信息后,打印评分
    if len(self.players) == self.playerRelayCnt:
      print 'The fuben totalScore is %d'%self.totalScore

if __name__ == '__main__':
  # 模拟创建玩家实体
  players = [Player(i) for i in xrange(3)]

  # 副本开始时,每个玩家将自己的MailBox注册到副本管理进程
  fs = FubenStub(players)

  # 副本进行中
  # ....

  # 副本结束,开始评分
  fs.evalFubenScore()

代码简化了副本评分逻辑的实现,其中Player类表示游戏的玩家实体,在游戏运行时无缝地在不同服务器中切换,FubenStub表示副本的管理进程,在副本刚开始的时候该副本内所有玩家会将自己的MailBox注册到管理进程中,其中MailBox表示各个实体的远程调用句柄。在副本结束时,FubenStub首先向各个玩家发送副本结束消息,同时请求玩家的战斗信息,玩家在得到消息后,将自己的战斗信息发送给FubenStub;然后当FubenStub收集完所有玩家的信息后,最终打印副本评分。

同步逻辑

如果Player和FubenStub在同一进程中的话,那所有的操作都可以同步完成,在FubenStub向玩家发送副本结束消息的同时可以马上得到该玩家的战斗信息,实现代码如下所示:

# -*- coding: gbk -*-

import random

class Player(object):
  def __init__(self, entityId):
    super(Player, self).__init__()
    self.entityId = entityId

  def onFubenEnd(self, mailBox):
    score = random.randint(1, 10)
    print "onFubenEnd player %d score %d"%(self.entityId, score)
    return self.entityId, score

class FubenStub(object):
  def __init__(self, players):
    super(FubenStub, self).__init__()
    self.players = players

  def evalFubenScore(self):
    totalScore = 0
    for player in self.players:
      entityId, score = player.onFubenEnd(self)
      print "onEvalFubenScore player %d score %d"%(entityId, score)
      totalScore += score

    print 'The fuben totalScore is %d'%totalScore

if __name__ == '__main__':
  players = [Player(i) for i in xrange(3)]

  fs = FubenStub(players)
  fs.evalFubenScore()

从以上两份代码可以看到由于异步操作,FubenStub中的评分逻辑人为地分成两个功能点:1)向玩家发送副本结束消息;2)接受玩家的战斗信息;并且两个功能点分布在两个不同的函数中。如果游戏逻辑一旦复杂,势必会造成功能点分散,出现过多onXXX异步回调函数,最终导致代码的开发成本和维护成本提高,可读性和可扩展性下降。

如果有一种方法,可以让函数在异步调用时暂时挂起,并且在回调函数得到返回值后恢复执行,那么就可以用同步化的编程模式开发异步逻辑。

yield 关键字

yield 是 Python中的一个关键字,凡是函数体中出现了 yield 关键字, Python将改变整个函数的上下文,调用该函数不再返回值, 而是一个生成器对象。只有调用这个生成器的迭代函数next才能开始执行生成器对象,当生成器对象执行到包含 yield 表达式时, 函数将暂时挂起,等待下一次next调用来恢复执行,具体机制如下:

1)调用生成器对象的next方法,启动函数执行;

2)当生成器对象执行到包含 yield 表达式时, 函数挂起;

3)下一次 next 函数调用又会驱动该生成器对象继续执行此后的语句, 直到遇见下一个 yield 再次挂起;

4)如果某次 next 调用驱动了生成器继续执行, 而此后函数正常结束,生成器会抛出 StopIteration 异常;

如下代码所示:

def f():
  print "Before first yield"
  yield 1
  print "Before second yield"
  yield 2
  print "After second yield"

g = f()
print "Before first next"
g.next()
print "Before second next"
g.next()
print "Before third yield"
g.next()

执行结果为:

Before first next

Before first yield

Before second next

Before second yield

Before third yield

After second yield

StopIteration

哈,有了让函数暂时挂起的机制,最后就剩下如何传递异步调用的返回值问题了。其实生成器的next函数已经实现了将参数从生成器对象内部向外传递的机制,并且python还提供了一个send函数将参数从外向生成器对象内部传递的机制,具体机制如下:

1) 调用next 函数驱动生成器时, next会同时等待生成器中下一个 yield 挂起,并将该yield后面的参数返回给next;

2)往生成器中传递参数,需要将next函数替换成send,此时send的功能与next相同(驱动生成器执行,等待返回值),同时send将后面的参数传递给生成器内部之前挂起的yield;

如下代码所示:

def f():
  msg = yield 'first yield msg'
  print "generator inner receive:", msg
  msg = yield 'second yield msg'
  print "generator inner receive:", msg

g = f()
msg = g.next()
print "generator outer receive:", msg
msg = g.send('first send msg')
print "generator outer receive:", msg
g.send('second send msg')

执行结果为:

generator outer receive: first yield msg

generator inner receive: first send msg

generator outer receive: second yield msg

generator inner receive: second send msg

StopIteration

同步化实现

好了,万事俱备只欠东风,下面就是简单对yield机制进行工程上封装以方便之后开发。下面的代码提供了一个叫IFakeSyncCall的interface,所有包含异步操作的逻辑类都可以继承这个接口:

class IFakeSyncCall(object):
  def __init__(self):
    super(IFakeSyncCall, self).__init__()
    self.generators = {}

  @staticmethod
  def FAKE_SYNCALL():
    def fwrap(method):
      def fakeSyncCall(instance, *args, **kwargs):
        instance.generators[method.__name__] = method(instance, *args, **kwargs)
        func, args = instance.generators[method.__name__].next()
        func(*args)
      return fakeSyncCall
    return fwrap

  def onFakeSyncCall(self, identify, result):
    try:
      func, args = self.generators[identify].send(result)
      func(*args)
    except StopIteration:
      self.generators.pop(identify)

其中interface中属性generators用来保存类中已经开始执行的生成器对象;函数FAKE_SYNCALL是一个decorator,装饰类中包含有yield的函数,改变函数的调用上下文,在fakeSyncCall内部封装了对生成器对象的next调用;函数onFakeSyncCall封装了所有onXXX函数的逻辑,其他实体通过调用这个函数传递异步回调的返回值。

下面就是经过同步化改进后的异步副本评分逻辑代码:

# -*- coding: gbk -*-
import random

class Player(object):
  def __init__(self, entityId):
    super(Player, self).__init__()
    self.entityId = entityId

  def onFubenEnd(self, mailBox):
    score = random.randint(1, 10)
    print "onFubenEnd player %d score %d"%(self.entityId, score)
    mailBox.onFakeSyncCall('evalFubenScore', (self.entityId, score))

class FubenStub(IFakeSyncCall):
  def __init__(self, players):
    super(FubenStub, self).__init__()
    self.players = players

  @IFakeSyncCall.FAKE_SYNCALL()
  def evalFubenScore(self):
    totalScore = 0
    for player in self.players:
      entityId, score = yield (player.onFubenEnd, (self,))
      print "onEvalFubenScore player %d score %d"%(entityId, score)
      totalScore += score

    print 'the totalScore is %d'%totalScore

if __name__ == '__main__':
  players = [Player(i) for i in xrange(3)]

  fs = FubenStub(players)
  fs.evalFubenScore()

比较evalFubenScore函数,基本已经和原本的同步逻辑代码相差无几。

利用yield机制实现同步化编程模型的另外一个优点是可以保证所有异步调用的逻辑串行化,从而保证数据的一致性和有效性,特别是在各种异步初始化流程中可以摒弃传统的timer sleep机制,从源头上扼杀一些隐藏很深的由于数据不一致性所导致的bug。

(0)

相关推荐

  • 理解生产者消费者模型及在Python编程中的运用实例

    什么是生产者消费者模型 在 工作中,大家可能会碰到这样一种情况:某个模块负责产生数据,这些数据由另一个模块来负责处理(此处的模块是广义的,可以是类.函数.线程.进程等).产 生数据的模块,就形象地称为生产者:而处理数据的模块,就称为消费者.在生产者与消费者之间在加个缓冲区,我们形象的称之为仓库,生产者负责往仓库了进商 品,而消费者负责从仓库里拿商品,这就构成了生产者消费者模型.结构图如下: 生产者消费者模型的优点: 1.解耦 假设生产者和消费者分别是两个类.如果让生产者直接调用消费者的某个方法,

  • 用Python的SimPy库简化复杂的编程模型的介绍

    在我遇到 SimPy 包的其中一位创始人 Klaus Miller 时,从他那里知道了这个包.Miller 博士阅读过几篇提出使用 Python 2.2+ 生成器实现半协同例程和"轻便"线程的技术的 可爱的 Python专栏文章.特别是(使我很高兴的是),他发现在用 Python 实现 Simula-67 样式模拟时,这些技术很有用. 结果表明 Tony Vignaux 和 Chang Chui 以前曾创建了另一个 Python 库,它在概念上更接近于 Simscript,而且该库使用

  • Python探索之pLSA实现代码

    pLSA(probabilistic Latent Semantic Analysis),概率潜在语义分析模型,是1999年Hoffman提出的一个被称为第一个能解决一词多义问题的模型,通过在文档与单词之间建立一层主题(Topic),将文档与单词的直接关联转化为文档与主题的关联以及主题与单词的关联.这里采用EM算法进行估计,可能存在差错,望积极批评指正. # -*- coding: utf-8 -*- import math import random import jieba import c

  • 用Python给文本创立向量空间模型的教程

    我们需要开始思考如何将文本集合转化为可量化的东西.最简单的方法是考虑词频. 我将尽量尝试不使用NLTK和Scikits-Learn包.我们首先使用Python讲解一些基本概念. 基本词频 首先,我们回顾一下如何得到每篇文档中的词的个数:一个词频向量. #examples taken from here: http://stackoverflow.com/a/1750187 mydoclist = ['Julie loves me more than Linda loves me', 'Jane

  • python基于隐马尔可夫模型实现中文拼音输入

    在网上看到一篇关于隐马尔科夫模型的介绍,觉得简直不能再神奇,又在网上找到大神的一篇关于如何用隐马尔可夫模型实现中文拼音输入的博客,无奈大神没给可以运行的代码,只能纯手动网上找到了结巴分词的词库,根据此训练得出隐马尔科夫模型,用维特比算法实现了一个简单的拼音输入法.githuh地址:https://github.com/LiuRoy/Pinyin_Demo 原理简介隐马尔科夫模型 抄一段网上的定义: 隐马尔可夫模型 (Hidden Markov Model) 是一种统计模型,用来描述一个含有隐含未

  • 基于python yield机制的异步操作同步化编程模型

    本文总结下如何在编写python代码时对异步操作进行同步化模拟,从而提高代码的可读性和可扩展性. 游戏引擎一般都采用分布式框架,通过一定的策略来均衡服务器集群的资源负载,从而保证服务器运算的高并发性和CPU高利用率,最终提高游戏的性能和负载.由于引擎的逻辑层调用是非抢占式的,服务器之间都是通过异步调用来进行通讯,导致游戏逻辑无法同步执行,所以在代码层不得不人为地添加很多回调函数,使一个原本完整的功能碎片化地分布在各个回调函数中. 异步逻辑 以游戏中的副本评分逻辑为例,在副本结束时副本管理进程需要

  • Python yield与实现方法代码分析

    yield的功能类似于return,但是不同之处在于它返回的是生成器. 生成器 生成器是通过一个或多个yield表达式构成的函数,每一个生成器都是一个迭代器(但是迭代器不一定是生成器). 如果一个函数包含yield关键字,这个函数就会变为一个生成器. 生成器并不会一次返回所有结果,而是每次遇到yield关键字后返回相应结果,并保留函数当前的运行状态,等待下一次的调用. 由于生成器也是一个迭代器,那么它就应该支持next方法来获取下一个值. 基本操作 # 通过`yield`来创建生成器 def f

  • python实现隐马尔科夫模型HMM

    一份完全按照李航<<统计学习方法>>介绍的HMM代码,供大家参考,具体内容如下 #coding=utf8 ''''' Created on 2017-8-5 里面的代码许多地方可以精简,但为了百分百还原公式,就没有精简了. @author: adzhua ''' import numpy as np class HMM(object): def __init__(self, A, B, pi): ''''' A: 状态转移概率矩阵 B: 输出观察概率矩阵 pi: 初始化状态向量 '

  • 基于Python中的yield表达式介绍

    python生成器 python中生成器是迭代器的一种,使用yield返回函数值.每次调用yield会暂停,而可以使用next()函数和send()函数可以恢复生成器. 这里可以参考Python函数式编程指南:对生成器全面讲解 注意到yield是个表达式而不仅仅是个语句,所以可以使用x = yield r 这样的语法. 这个知识点在协程中需要使用.协程的概念指的是在一个线程内,一个程序中断去执行另一个程序,有点类似于CPU中断.这样减少了切换线程带来的负担,同时不需要多线程中的锁机制,因为不存在

  • 基于python脚本实现软件的注册功能(机器码+注册码机制)

    一.前言: 目的:完成已有python图像处理工具的注册功能 功能:用户运行程序后,通过文件自动检测认证状态,如果未经认证,就需要注册.注册过程是用户将程序运行后显示的机器码(C盘的卷序号)发回给管理员,管理员对机器码加密后生成加密文件或字符串返回给用户.每次启动程序,在有注册文件的情况下,程序就会通过DES和base64解码,并与此刻获取到的C盘卷序列号比对,如果一致则运行主程序.如果注册文件解码后与卷序号不一致,就要提醒用户输入注册码,如果对新输入的解码后和重新获取的机器码一致,则通过认证,

  • 基于Python 中函数的 收集参数 机制

    定义函数的时候,在参数前加了一个 * 号,函数可以接收零个或多个值作为参数.返回结果是一个元组. 传递零个参数时函数并不报错,而是返回一个空元组.但以上这种方法也有局限性,它不能收集关键字参数. 对关键字参数进行收集的另一种 收集参数 机制:使用两个星号 ( ** ) ,用法同上.最后返回一个以参数名为键.参数值为键值的字典. * 和 ** 是可以一起使用的,返回特定的结果. 参数收集的用处之一是使我们编写函数时不用头疼将 N 多个参数都塞在一个括号里,既美观又省事.用处之二便是: * :可以将

  • 基于Python的GUI图形用户界面编程详细讲解

    目录 前言 常用的GUI库 1.Tkinter 2.wxPython 3.PyQT 基于tkinter模块创建GUI程序步骤 主窗口设置 主窗口位置和大小 GUI编程整体描述 常用组件汇总列表 GUI应用程序类的经典写法 总结 前言 在最早程序的设计中,程序和用户的交互都是通过控制台来完成的,而GUI(Graphics User Interface),即图形用户编程,是Python 所提供的一个丰富的组件,可以快速的实现使用图形界面和用户交互. 常用的GUI库 1.Tkinter tkinter

  • python 基于TCP协议的套接字编程详解

    基于TCP协议的套接字编程 实现电话沟通为例,这里传递的是字符,可以自己尝试去发送一个文件 # 服务端 import socket # 1. 符合TCP协议的手机 server = socket.socket(socket.AF_INET,socket.SOCK_STREAM) # TCP # 2. 绑定手机号 一个服务器,我们自己的电脑作为服务器的话,用自己的IP地址 server.bind(('127.0.0.1',8000)) # 127.0.0.1 代表本地 # server.bind(

  • 基于python的多进程共享变量正确打开方式

    多进程共享变量和获得结果 由于工程需求,要使用多线程来跑一个程序.但是因为听说python的多线程是假的,于是使用多进程,反正任务需要共享的参数少. 查阅资料,发现实现多进程主要使用Multiprocessing,有两种方式,一种是Process,另一种是Pool. p = Process(target=fun,args=(args)) 再通过p.start()来启动一个子进程,通过p.join()方法来使得子进程运行结束后再执行父进程. 但是这样很烦,还要写个for 循环来开n个线程和join

随机推荐