理解Motor: greenlet加持的异步非阻塞MongoDB驱动分析
原文链接 https://cyrusin.github.io/2016/06/06/motor20160606/
注:以下为加速网络访问所做的原文缓存,经过重新格式化,可能存在格式方面的问题,或偶有遗漏信息,请以原文为准。
最不喜欢在Tornado中使用任何同步阻塞型的东西,不想让ioloop阻塞在某个IO调用上,因为单线程的东西任何阻塞都是代价很高的,除非你的数据库被优化的性能很好,速度很快。除了之前的线程池之外,直接使用异步库也是不错的选择,Motor就是Tornado里可以用的很好的异步库,它兼容Tornado的gen.coroutine
式的异步调用形式,主要使用了greenlet
来巧妙的封装PyMongo的同步API, 把底层的socket
IO进行了异步化的处理,化同步为异步。
从使用的例子来分析Motor是如何把PyMongo的API异步化的:
client = motor.MotorClient(...)
db = client['testDB']
...
class SomeHandler(web.RequestHandler):
@gen.coroutine
def get(self):
...
doc = yield db.test_collection.find_one({'i': {'$lt': 1}})
...
以上只是简单的示例使用Motor的大概流程,当我们通过MotorClient生成client的时候,Motor将自己实现的MotorPool
作为pool_class
传给PyMongo的MongoClient
的构造函数,这样连接池就被替换成了Motor自己的连接池,而MotorPool
在通信过程中则使用了Motor自己封装的异步socket。
Motor除了在底层上替换原有的通信过程,另外对PyMongo的API做了异步化的处理,使之兼容Tornado的异步调用过程。和PyMongo使用的不同就是像代码里展示的那样,类似Tornado里其他的异步调用一样,通过yield出一个Future来暂时挂起上下文,当Future被set_result
的时候,在ioloop上注册callback来恢复上下文。也就是说:doc = yield db.test_collection.find_one({'i': {'$lt': 1}})
这句并不会阻塞ioloop,而是暂时挂起了。Motor做到这点主要是利用了greenlet来封装原始的PyMongo的同步API。
看一下类似find_one
这样的PyMongo的操作API(其余还有find
、insert
等等)是怎样被异步化的:
def asynchronize(motor_class, framework, sync_method, has_write_concern, doc=None):
@functools.wraps(sync_method)
def method(self, *args, **kwargs):
check_deprecated_kwargs(kwargs)
loop = self.get_io_loop() #比如: Tornado的IOLoop
callback = kwargs.pop('callback', None)
if callback:
if not callable(callback):
raise callback_type_error
future = None
else:
#在Tornado的gen.coroutine中yield出来
future = framework.get_future(self.get_io_loop())
#call_method是实际的对PyMongo的sync_method的封装,
#这个方法的调用将在一个子greenlet中进行
def call_method():
try:
#sync_method 是实际的同步的PyMongo的同步调用
#也许会疑惑:即使在子greenlent中运行,不依然会阻塞吗?
#稍后会解决这个疑惑
result = sync_method(self.delegate, *args, **kwargs)
#给yield出去的future赋予结果,可以恢复外部的上下文(gen.coroutine)
if callback:
# Schedule callback(result, None) on main greenlet.
framework.call_soon(
loop,
functools.partial(callback, result, None))
else:
# Schedule future to be resolved on main greenlet.
framework.call_soon(
loop,
functools.partial(future.set_result, result))
except Exception as e:
if callback:
framework.call_soon(
loop,
functools.partial(callback, None, e))
else:
# TODO: we lost Tornado's set_exc_info. Frameworkify this.
framework.call_soon(
loop,
functools.partial(future.set_exception, e))
#在一个子greenlet中运行这个同步的方法封装
#注意: 这个greenlet是我们在当前方法中生成的,也就是说我们当前的方法所处的
#greenlet是我们生成的greenlet的父greenlet,这个在后续的上下文切换中非常重要
greenlet.greenlet(call_method).switch() #切换到子greenlet执行
#返回future
return future
...
return method
以上代码会封装PyMongo的同步操作,使之可以在Tornado的gen.coutoine
中yield出来,参数sync_method
就是原始的PyMongo的同步调用。通过注释理解这种封装的方法,原理其实就是:把同步调用放到一个子greenlet中去执行,当子greenlet阻塞时,切换上下文回父greenlet执行,也就是yield出future,这样就不会阻塞主ioloop了。最关键的问题开始:1)什么时候切换回父greenlet? 2)当数据到达,阻塞解除时,再怎么回到子greenlet继续执行?
我们说过,Motor用自己的MotorPool
取代了PyMongo的连接池,回看下之前代码,比如我们调用find_one
,我们会在子greenlet中执行,而find_one
里则会走PyMongo的同步的那一套逻辑,但是当代码走到最底层的实际的socket的IO时,就走Motor的socket的IO了,因为连接池已经被Motor移花接木,换成了自己的那一套。
当sync_method里的同步逻辑走到socket的IO时,原有的socket.recv
被Motor用异步的recv
代替了:
@tornado_motor_sock_method
def recv(self, num_bytes):
#借助了Tornado的IOStream的异步读写封装
#IOStream的异步读写不会阻塞,而是一种事件驱动的异步非阻塞IO
future = stream_method(self.stream, 'read_bytes', num_bytes)
try:
if self.timeout_td:
result = yield _Wait(future, self.io_loop, self.timeout_td, timeout_exc)
else:
result = yield future
except IOError as e:
raise socket.error(str(e))
raise gen.Return(result)
从以上针对socket的recv方法看出,通过Tornado的异步读写封装(IOStream),实现了异步的socket,目前关键的就是tornado_motor_sock_method
了,很明显,这个装饰器将完成greenlet上下文的父子切换,解决我们之前在sync_method
调用时的疑惑。
看一下tornado_motor_sock_method
这个装饰器的实现:
def tornado_motor_sock_method(method):
coro = gen.coroutine(method)
@functools.wraps(method)
def wrapped(self, *args, **kwargs):
#当前greenlet是一个子greenlet
child_gr = greenlet.getcurrent()
#获取当前greenlet的父greenlet,即之前代码提到过的asynchronize所在的greenlet
main = child_gr.parent
def callback(future):
if future.exc_info():
child_gr.throw(*future.exc_info())
elif future.exception():
child_gr.throw(future.exception())
else:
#当future的结果到达,切换回挂起的子greenlet
child_gr.switch(future.result())
#保证callback在当前greenlet的父greenlet中运行
self.io_loop.add_future(coro(self, *args, **kwargs), callback)
#return这句会暂时挂起当前greenlet,将控制权切换回父greenlet,
#在上面的callback执行时,才会切换回当前greenlet,return语句返回
return main.switch()
return wrapped
以find_one
调用为例,find_one
的一系列调用都在一个greenlet中进行,比如最终走到了resut = socket.recv(...)
这句,通过以上的代码可以发现,这句会暂时被挂起这个greenlet,并把控制权暂时切换回当前greenlet的父greenlet,也就是asynchronize方法所在的主greenlet,而回看之前的asynchronize的代码,发现切换回去之后,asynchronize调用立即返回future,也就是doc = yield db.test_collection.find_one(...)
会yield出这个future,这样,ioloop没有被阻塞。而当socket上数据到达时,我们会通过在这句self.io_loop.add_future(coro(self, *args, **kwargs), callback)
里添加的callback切换回挂起的子greenlet,也就是return main.switch()
这句会返回,result = socket.recv(...)
这句就恢复执行,这样,刚才挂起的greenlet就继续往下执行了,最终执行到asynchronize里的call_method
里的result = sync_method(self.delegate, *args, **kwargs)
这句返回,再回顾之前的asynchronize的逻辑,这句返回后,通过framework.call_soon(loop, functools.partial(future.set_result, result))
这句,yield出去的future被set_result
,这样,doc = yield db.test_collection.find_one(...)
挂起的上下文稍后也会恢复执行了。
以上就是Motor利用Tornado的ioloop和iostream以及greenlet来封装PyMongo并将其异步化的过程。
另附, Motor作者在自己博客上的介绍文章: Introducing Motor, an asynchronous MongoDB driver for Python and Tornado Motor Internals: How I Asynchronized a Synchronous Library