Python的Twisted框架中使用Deferred对象来管理回调函数
首先抛出我们在讨论使用回调编程时的一些观点:
- 激活errback是非常重要的。由于errback的功能与except块相同,因此用户需要确保它们的存在。他们并不是可选项,而是必选项。
- 不在错误的时间点激活回调与在正确的时间点激活回调同等重要。典型的用法是,callback与errback是互斥的即只能运行其中一个。
- 使用回调函数的代码重构起来有些困难。
Deferred
Twisted使用Deferred对象来管理回调函数的序列。有些情况下可能要把一系列的函数关联到Deferred对象上,以便在在异步操作完成时按次序地调用(这些一系列的回调函数叫做回调函数链);同时还要有一些函数在异步操作出现异常时来调用。当操作完成时,会先调用第一个回调函数,或者当错误发生时,会先调用第一个错误处理回调函数,然后Deferred对象会把每个回调函数或错误处理回调函数的返回值传递给链中的下一个函数。
Callbacks
一个twisted.internet.defer.Deferred对象代表着在将来某个时刻一定会产生结果的一个函数。我们可以把一个回调函数关联到Deferred对象上,一旦这个Deferred对象有了结果,这个回调函数就会被调用。另外,Deferred对象还允许开发者为它注册一个错误处理回调函数。Deferred机制对于各种各样的阻塞或者延时操作都为开发者提供了标准化的接口。
from twisted.internet import reactor, defer
def getDummyData(inputData): """ This function is a dummy which simulates a delayed result and returns a Deferred which will fire with that result. Don't try too hard to understand this. """ print('getDummyData called') deferred = defer.Deferred() # simulate a delayed result by asking the reactor to fire the # Deferred in 2 seconds time with the result inputData * 3 reactor.callLater(2, deferred.callback, inputData * 3) return deferred def cbPrintData(result): """ Data handling function to be added as a callback: handles the data by printing the result """ print('Result received: {}'.format(result)) deferred = getDummyData(3) deferred.addCallback(cbPrintData) # manually set up the end of the process by asking the reactor to # stop itself in 4 seconds time reactor.callLater(4, reactor.stop) # start up the Twisted reactor (event loop handler) manually print('Starting the reactor') reactor.run()
多个回调函数
在一个Deferred对象上可以关联多个回调函数,这个回调函数链上的第一个回调函数会以Deferred对象的结果为参数来调用,而第二个回调函数以第一个函数的结果为参数来调用,依此类推。为什么需要这样的机制呢?考虑一下这样的情况,twisted.enterprise.adbapi返回一个Deferred对象——一个一个SQL查询的结果,可能有某个web窗口会在这个Deferred对象上添加一个回调函数,以把查询结果转换成HTML的格式,然后把Deferred对象继续向前传递,这时Twisted会调用这个回调函数并把结果返回给HTTP客户端。在出现错误或者异常的情况下,回调函数链不会被调用。
from twisted.internet import reactor, defer class Getter: def gotResults(self, x): """ The Deferred mechanism provides a mechanism to signal error conditions. In this case, odd numbers are bad. This function demonstrates a more complex way of starting the callback chain by checking for expected results and choosing whether to fire the callback or errback chain """ if self.d is None: print("Nowhere to put results") return d = self.d self.d = None if x % 2 == 0: d.callback(x * 3) else: d.errback(ValueError("You used an odd number!")) def _toHTML(self, r): """ This function converts r to HTML. It is added to the callback chain by getDummyData in order to demonstrate how a callback passes its own result to the next callback """ return "Result: %s" % r def getDummyData(self, x): """ The Deferred mechanism allows for chained callbacks. In this example, the output of gotResults is first passed through _toHTML on its way to printData. Again this function is a dummy, simulating a delayed result using callLater, rather than using a real asynchronous setup. """ self.d = defer.Deferred() # simulate a delayed result by asking the reactor to schedule # gotResults in 2 seconds time reactor.callLater(2, self.gotResults, x) self.d.addCallback(self._toHTML) return self.d def cbPrintData(result): print(result) def ebPrintError(failure): import sys sys.stderr.write(str(failure)) # this series of callbacks and errbacks will print an error message g = Getter() d = g.getDummyData(3) d.addCallback(cbPrintData) d.addErrback(ebPrintError) # this series of callbacks and errbacks will print "Result: 12" g = Getter() d = g.getDummyData(4) d.addCallback(cbPrintData) d.addErrback(ebPrintError) reactor.callLater(4, reactor.stop) reactor.run()
需要注意的一点是,在方法gotResults中处理self.d的方式。在Deferred对象被结果或者错误激活之前,这个属性被设置成了None,这样Getter实例就不会再持有将要激活的Deferred对象的引用。这样做有几个好处,首先,这样可以避免Getter.gotResults有时会重复激活相同的Deferred对象的可能性(这样会导致出现AlreadyCalledError异常)。其次,这样做可以使得该Deferred对象上可以添加一个调用了Getter.getDummyData函数的回调函数,而不会产生问题。还有,这样使得Python垃圾收集器更容易通过引用循环来检测出一个对象是否需要回收。
可视化的解释
这里写图片描述
1.请求方法请求数据到Data Sink,得到返回的Deferred对象。
2.请求方法把回调函数关联到Deferred对象上。
1.当结果已经准备好后,把它传递给Deferred对象。如果操作成功就调用Deferred对象的.callback(result)方法,如果操作失败就调用Deferred对象的.errback(faliure)方法。注意failure是twisted.python.failure.Failure类的一个实例。
2.Deferred对象使用result或者faliure来激活之前添加的回调函数或者错误处理回调函数。然后就按照下面的规则来沿着回调函数链继续执行下去:
回调函数的结果总是当做第一个参数被传递给下一个回调函数,这样就形成了一个链式的处理器。
如果一个回调函数抛出了异常,就转到错误处理回调函数来执行。
如果一个faliure没有得到处理,那么它会沿着错误处理回调函数链一直传递下去,这有点像异步版本的except语句。
如果一个错误处理回调函数没有抛出异常或者返回一个twisted.python.failure.Failure实例,那么接下来就转到去执行回调函数。
错误处理回调函数
Deferred对象的错误处理模型是以Python的异常处理为基础的。在没有错误发生的情况下,所有的回调函数都会被执行,一个接着一个,就像上面所说的那样。
如果没有执行回调函数而是执行了错误处理回调函数(比如DB查询发生了错误),那么一个twisted.python.failure.Failure对象会被传递给第一个错误处理回调函数(你可以添加多个错误处理回调函数,就像回调函数链一样)。可以把错误处理回调函数链当做普通Python代码中的except代码块。
除非在except代码块中显式地raise了一个错误,否则Exception对象就会被捕捉到且不再继续传播下去,然后又开始正常地执行程序。对于错误处理回调函数链来说也是一样,除非你显式地return一个Faliure或者重新抛出一个异常,否则错误就会停止继续传播,然后就会从那里开始执行正常的回调函数链(使用错误处理回调函数返回的值)。如果错误处理回调函数返回了一个Faliure或者抛出了一个异常,那么这个Faliure或者异常就会被传递给下一个错误处理回调函数。
注意,如果一个错误处理回调函数什么也没有返回,那它实际上返回的是None,这就意味着在这个错误处理回调函数执行之后会继续回调函数链的执行。这可能不是你实际上期望的那样,所以要确保你的错误处理回调函数返回一个Faliure对象(或者就是传递给它当参数的那个Faliure对象)或者一个有意义的返回值以便来执行下一个回调函数。
twisted.python.failure.Failure有一个有用的方法叫做trap,可以让下面的代码变成更有效率的另一种形式:
try: # code that may throw an exception cookSpamAndEggs() except (SpamException, EggException): # Handle SpamExceptions and EggExceptions ...
可以写成:
def errorHandler(failure): failure.trap(SpamException, EggException) # Handle SpamExceptions and EggExceptions d.addCallback(cookSpamAndEggs) d.addErrback(errorHandler)
如果传递给faliure.trap的参数没有能和Faliure中的错误匹配的,那它会重新抛出这个错误。
还有一个需要注意的地方,twisted.internet.defer.Deferred.addCallbacks方法的功能和addCallback再跟上addErrback的功能是类似的,但不完全一样。考虑一下下面的情况:
# Case 1 d = getDeferredFromSomewhere() d.addCallback(callback1) # A d.addErrback(errback1) # B d.addCallback(callback2) d.addErrback(errback2) # Case 2 d = getDeferredFromSomewhere() d.addCallbacks(callback1, errback1) # C d.addCallbacks(callback2, errback2)
对于Case 1来说,如果在callback1里面发生了错误,那么errback1就会被调用。而对于Case 2来说,被调用的却是是errback2。
实际上是因为,在Case 1中,行A会处理getDeferredFromSomewhere执行成功的情况,行B会处理发生在getDeferredFromSomewhere执行时或者行A的callback1执行时的错误。而在Case 2中,行C中的errback1只会处理getDeferredFromSomewhere执行时产生的错误,而不会负责callback1中产生的错误。
未处理的错误
如果一个Deferred对象在还有一个未处理的错误时(即如果它还有下一个errback就一定会调用)就被垃圾收集器清除掉了,那么Twisted会把这个错误的traceback记录到日志文件里。这意味着你可能不用添加errback仍然能够记录错误。不过要小心的是,如果你还持有这个Deferred对象的引用,并且它永远不会被垃圾收集器清理,那么你就会永远看不到这个错误(而且你的callbacks会神秘地永远不会执行)。如果不确定上述情况是否会发生,你应当在回调函数链之后显式地添加一个errback,即使只是这样写:
# Make sure errors get logged from twisted.python import log d.addErrback(log.err)
处理同步和异步结果
在一些应用中,可能同时会有同步的函数,也会有异步的函数。例如,对于一个用户认证函数,如果它是从内存中检查用户是否已经认证,这样就可以立即返回结果;但是如果它需要等待网络上的数据,那它就应当返回一个当数据到达时就激活的Deferred对象。这就是说,一个想要去检查用户是否已经认证的函数需要能同时接受立即返回的结果和Deferred对象。
下面的例子中,authenticateUser使用了isValidUser来认证用户:
def authenticateUser(isValidUser, user): if isValidUser(user): print("User is authenticated") else: print("User is not authenticated")
这个函数假定isValidUser是立即返回的,然而实际上isValidUser可能是异步认证用户的并且返回的是一个Deferred对象。把这个函数调整为既能接收同步的isValidUser又能接收异步的isValidUser是有可能的。同时把同步的函数改成返回值为Deferred对象也是可以的。
在库函数代码中处理可能的Deferred对象
这是一个可能被传递给authenticateUser的同步的用户认证方法:
def synchronousIsValidUser(user): ''' Return true if user is a valid user, false otherwise ''' return user in ["Alice", "Angus", "Agnes"]
这是一个异步的用户认证方法,返回一个Deferred对象:
from twisted.internet import reactor, defer def asynchronousIsValidUser(user): d = defer.Deferred() reactor.callLater(2, d.callback, user in ["Alice", "Angus", "Agnes"]) return d
我们最初对authenticateUser的实现希望isValidUser是同步的,但是现在需要把它改成既能处理同步又能处理异步的isValidUser实现。对此,可以使用maybeDeferred函数来调用isValidUser,这个函数可以保证isValidUser函数的返回值是一个Deferred对象,即使isValidUser是一个同步的函数:
from twisted.internet import defer def printResult(result): if result: print("User is authenticated") else: print("User is not authenticated") def authenticateUser(isValidUser, user): d = defer.maybeDeferred(isValidUser, user) d.addCallback(printResult)
现在isValidUser无论是同步的还是异步的都可以了。
也可以把synchronousIsValidUser函数改写成返回一个Deferred对象,可以参考这里。
取消回调函数
动机:一个Deferred对象可能需要很长时间才会调用回调函数,甚至于永远也不会调用。有时候可能没有那么好的耐心来等待Deferred返回结果。既然Deferred完成后要执行的所有代码都在你的应用中或者调用的库中,那么你就可以选择在已经过去了很长时间才收到结果时忽略这个结果。然而,即使你选择忽略这个结果,这个Deferred对象产生的底层操作仍然在后台工作着,并且消耗着机器资源,比如CPU时间、内存、网络带宽甚至磁盘容量。因此,当用户关闭窗口,点击了取消按钮,从你的服务器上断开连接或者发送了一个“停止”的指令,这时你需要显式地声明你对之前原定的操作的结果已经不再感兴趣了,以便原先的Deferred对象可以做一些清理的工作并释放资源。
这是一个简单的例子,你想连接到一个外部的机器,但是这个机器太慢了,所以需要在应用中添加一个取消按钮来终止这次连接企图,以便用户可以连接到另一个机器。这里是这样的一个应用的大概逻辑:
def startConnecting(someEndpoint): def connected(it): "Do something useful when connected." return someEndpoint.connect(myFactory).addCallback(connected) # ... connectionAttempt = startConnecting(endpoint) def cancelClicked(): connectionAttempt.cancel()
显然,startConnecting被一些UI元素用来让用户选择连接哪个机器。然后是一个取消按钮陪着到cancelClicked函数上。
当connectionAttempt.cancel被调用时,会发生以下操作:
- 导致潜在的连接操作终止,如果它仍然在进行中的话
- 不管怎样,使得connectionAttempt这个Deferred对象及时地被完成
- 有可能导致connectionAttempt这个Deferred对象因为CancelledError错误调用错误处理函数
即使这个取消操作已经表达了让底层的操作停止的需求,但是底层的操作不大可能马上就对此作出反应。甚至在这个简单的例子中就有一个不会被中断的操作:域名解析,因此需要在在一个线程中执行;这个应用中的连接操作如果在等待域名解析的时候就不能被取消。所以你要取消的Deferred对象可能不会立即调用回调函数或错误处理回调函数。
一个Deferred对象可能会在执行它的回调函数链的任何一点时等待另一个Deferred对象的完成。没有方法可以在回调函数链的特定一个点知道是否每件事都已经准备好了。由于有可能一个回调函数链的很多层次上的函数都会希望取消同一个Deferred对象,那么链上任何层次的函数在任意时刻都有可能调用.cancel()函数。.cancel()函数从不抛出任何异常或者返回任何值。你可以重复调用它,即使这个Deferred对象已经被激活了,它已经没有剩余的回调函数了。
在实例化了一个Deferred对象的同时,可以给它提供一个取消函数(Deferred对象的构造函数为def __init__(self, canceller=None): (source)),这个canceller可以做任何事情。理想情况下,它做的每件事情都都会阻止之前你请求的操作,但是并不总是能保证这样。所以Deferred对象的取消只是尽力而为。原因有几点:
Deferred对象不知道怎样取消底层的操作。
底层的操作已经执行到了一个不可取消的状态,因为可能已经执行了一些不可逆的操作。
Deferred对象可能已经有了结果,所以没有要取消的东西了。
调用cancel()函数后,不管是否能取消,总会得到成功的结果,不会出现出错的情况。在第一种和第二和情况下,由于底层的操作仍在继续,Deferred对象大可以twisted.internet.defer.CancelledError为参数来调用它的errback。
如果取消的Deferred对象正在等待另一个Deferred对象,那么取消操作会向前传递到此Deferred对象。
可以参考API。
默认的取消行为
所有的Deferred对象都支持取消,但是只提供了很简单的行为,也没有释放任何资源。
考虑一下下面的例子:
operation = Deferred() def x(result): print("Hooray, a result:" + repr(x)) operation.addCallback(x) # ... def operationDone(): operation.callback("completed")
如果需要取消operation这个Deferred对象,而operation没有一个canceller的取消函数,就会产生下面两种之一的结果:
如果operationDone已经被调用了,也就是operation对象已经完成了,那么什么都不会改变。operation仍然有一个结果,不过既然没有其他的回调函数了,所以没有什么行为上可以看到的变化。
如果operationDone已经还没有被调用,那么operation会马上以CancelledError为参数激活errback。
在正常情况下,如果一个Deferred对象已经调用了回调函数再来调用callback会导致一个AlreadyCalledError。因此,callback可以在已经取消的、但是没有canceller的Deferred对象上再调用一次,只会导致一个空操作。如果你多次调用callback,仍会得到一个AlreadyCalledError异常。
创建能取消的Deferred对象:自定义取消函数
假设你在实现一个HTTP客户端,返回一个在服务器返回响应的时候会激活的Deferred对象。取消最好是关闭连接。为了让取消函数这么做,可以向Deferred对象的构造函数中传递一个函数作为参数(当Deferred对象被取消的时候会调用这个函数):
class HTTPClient(Protocol): def request(self, method, path): self.resultDeferred = Deferred( lambda ignore: self.transport.abortConnection()) request = b"%s %s HTTP/1.0\r\n\r\n" % (method, path) self.transport.write(request) return self.resultDeferred def dataReceived(self, data): # ... parse HTTP response ... # ... eventually call self.resultDeferred.callback() ...
现在如果在HTTPClient.request()返回的Deferred对象上调用了cancel()函数,这个HTTP请求就会取消(如果没有太晚的话)。要注意的是,还要在一个已经被取消的、带有canceller的Deferred对象上调用callback()。
DeferredList
有时你想在几个不同的事件都发生后再得到通知,而不是每个事件发生都会通知一下。例如,你想等待一个列表中所有的连接都关闭。twisted.internet.defer.DeferredList就适用于这种情况。
用多个Deferred对象来创建一个DeferredList,只需传递一个你想等待的Deferred对象的列表即可:
# Creates a DeferredList
dl = defer.DeferredList([deferred1, deferred2, deferred3])
现在就可以把这个DeferredList当成一个普通的Deferred来看待了,例如你也可以调用addCallbacks等等。这个DeferredList会在所有的Deferred对象都完成之后才调用它的回调函数。这个回调函数的参数是这个DeferredList对象中包含的所有Deferred对象返回结果的列表,例如:
# A callback that unpacks and prints the results of a DeferredList def printResult(result): for (success, value) in result: if success: print('Success:', value) else: print('Failure:', value.getErrorMessage()) # Create three deferreds. deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred3 = defer.Deferred() # Pack them into a DeferredList dl = defer.DeferredList([deferred1, deferred2, deferred3], consumeErrors=True) # Add our callback dl.addCallback(printResult) # Fire our three deferreds with various values. deferred1.callback('one') deferred2.errback(Exception('bang!')) deferred3.callback('three') # At this point, dl will fire its callback, printing: # Success: one # Failure: bang! # Success: three # (note that defer.SUCCESS == True, and defer.FAILURE == False)
正常情况下DeferredList不会调用errback,但是除非把cousumeErrors设置成True,否则在Deferred对象中产生的错误仍然会激活每个Deferred对象各自的errback。
注意,如果你想在添加到DeferredList中去的Deferred对象上应用回调函数,那么就需要注意添加回调函数的时机。把一个Deferred对象添加到DeferredList中会导致同时也给该Deferred对象添加了一个回调函数(当这个回调函数运行的时候,它的功能是检查DeferredList是否已经完成了)。最重要的是,变量这个回调函数把记录了Deferred对象的返回值并把这个值传递到最终交给DeferredList回调函数当做参数的列表中。
因此,如果你在把一个Deferred添加到DeferredList之后又给这个Deferred对象添加了一个回调函数,那么这个新添加的回调函数的返回值不会被传递到DeferredList的回调函数中。为了避免这种情况的发生,建议不要在把一个Deferred对象添加到DeferredList中之后再给这个Deferred添加回调函数。
def printResult(result): print(result) def addTen(result): return result + " ten" # Deferred gets callback before DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() deferred1.addCallback(addTen) dl = defer.DeferredList([deferred1, deferred2]) dl.addCallback(printResult) deferred1.callback("one") # fires addTen, checks DeferredList, stores "one ten" deferred2.callback("two") # At this point, dl will fire its callback, printing: # [(True, 'one ten'), (True, 'two')] # Deferred gets callback after DeferredList is created deferred1 = defer.Deferred() deferred2 = defer.Deferred() dl = defer.DeferredList([deferred1, deferred2]) deferred1.addCallback(addTen) # will run *after* DeferredList gets its value dl.addCallback(printResult) deferred1.callback("one") # checks DeferredList, stores "one", fires addTen deferred2.callback("two") # At this point, dl will fire its callback, printing: # [(True, 'one), (True, 'two')]
DeferredList接受三个关键字参数来定制它的行为:fireOnOneCallback、fireOnOneErrback和cousumeErrors。如果设置了fireOnOneCallback,那么只要有一个Deferred对象调用了它的回调函数,DeferredList就会立即调用它的回调函数。相似的,如果设置了fireOnOneErrback,那么只要有一个Deferred调用了errback,DeferredList就会调用它的errback。注意,DeferredList只是一次性的,所以在一次callback或者errback调用之后,它就会什么也不做(它会忽略它的Deferred传递给它的所有结果)。
fireOnOneErrback选项在你想等待所有事情成功执行,而且需要在出错时马上知道的情形下是很有用的。
consumeErrors参数会使DeferredList中包含的Deferred对象中产生的错误在建立了DeferredList对象之后,不会传递给原来每个Deferred对象各自的errbacks。创建了DeferredList对象之后,任何单个Deferred对象中产生的错误会被转化成结果为None的回调调用。用这个选项可以防止它所包含的Deferred中的“Unhandled error in Deferred”警告,而不用添加额外的errbacks(否则要消除这个警告就需要为每个Deferred对象添加errback)。 给consumeErrors参数传递一个True不会影响fireOnOneCallback和fireOnOneErrback的行为。应该总是使用这个参数,除非你想在将来给这些列表中的Deferred对象添加callbacks或errbacks,或者除非你知道它们不会产生错误。否则,产生错误的话会导致一个被Twisted记录到日志中的“unhandled error”。
DeferredList一个普遍的用法是把一些并行的异步操作结果组合到一起。如果所有的操作都成功了,那就可以操作成功,如果有一个操作失败了,那么就操作失败。twisted.internet.defer.gatherResults是一个快捷方式:
from twisted.internet import defer d1 = defer.Deferred() d2 = defer.Deferred() d = defer.gatherResults([d1, d2], consumeErrors=True) def cbPrintResult(result): print(result) d.addCallback(cbPrintResult) d1.callback("one") # nothing is printed yet; d is still awaiting completion of d2 d2.callback("two") # printResult prints ["one", "two"]
链式的Deferred
如果你需要一个Deferred对象来等待另一个Deferred对象的执行,你所要做的只是从它的回调函数链中的回调函数中返回一个Deferred对象。具体点,如果你从某个Deferred对象A的一个回调函数中返回Deferred对象B,那么A的回调函数链就会在B的callback()函数调用之前进行等待。此时,A的下一个回调函数的第一个参数就是B的最后一个回调函数返回的结果。
注意,如果一个`Deferred`对象在它的回调函数中直接或者间接地返回了它本身,那么这样的行为是没有定义的。代码会试图检测出这种情况然后给出警告。在将来可能会直接抛出异常。
如果这看起来有点复杂,也不要担心——当你遇到这种情况的时候,你可能会直接认出来并且知道为什么会产生这样的结果。如果你需要手动地把Deferred对象
链接起来,这里有一个方便的方法:
chainDeferred(otherDeferred)
总结
我们认识到了deferred是如何帮我们解决这些问题的:
我们不能忽视errback,在任何异步编程的API中都需要它。Deferred支持errbacks。
激活回调多次可能会导致很严重的问题。Deferred只能被激活一次,这就类似于同步编程中的try/except的处理方法。
含有回调的程序在重构时相当困难。有了deferred,我们就通过修改回调链来重构程序。