Tornado异步原理
作者:向前的步伐 / 发表: 2019年12月14日 15:13 / 更新: 2019年12月14日 15:13 / tornado / 阅读量:984
Tornado使用了单进程+协程+IO多路复用的机制,解决了C10K中因为线程过多的上下文切换而导致的CPU资源浪费。
Tornado异步的实现,就是通过tornado.gen生成器来来实现,从而提高服务器的并发能力。
主要就是使用到tornado.gen.coroutine装饰器,异步的实现思路:生成器中通过yield语句可以使函数暂停执行,而send()方法可以恢复函数的执行。tornado将那些异步操作放置在yield语句之后,当这些异步操作执行完成之后,tornado会send()异步执行结果到生成器中恢复后续执行操作。
在tornado中大多数的异步操作都是返回一个Future对象。yield Future对象,会返回该异步操作结果,例如:ret = yield future_obj,当future_obj对应的异步操作完成之后,就会自动将异步结果赋值到ret中。
Future对象
Future对象可以看作一个异步操作的占位符,这个对象包含很多属性,包括_result和_callbacks,分别用来存储异步操作结果和回调函数。还包含了很多方法,比如添加回调函数,设置异步操作结果等。当异步操作完成之后,该对象的set_done会被调用,然后遍历并运行_callbacks中的回调函数。
下面是一个Future对象的简化版代码:
class Future(object):
'''
Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
'''
def __init__(self):
self._result = None # 执行的结果
self._callbacks = [] # 用来保存该future对象的回调函数
def result(self, timeout=None):
# 如果操作成功,返回结果。如果失败则抛出异常
self._clear_tb_log()
if self._result is not None:
return self._result
if self._exc_info is not None:
raise_exc_info(self._exc_info)
self._check_done()
return self._result
def add_done_callback(self, fn):
if self._done:
fn(self)
else:
self._callbacks.append(fn)
def set_result(self, result):
self._result = result
self._set_done()
def _set_done(self):
# 执行结束(成功)后的操作。
self._done = True
for cb in self._callbacks:
try:
cb(self)
except Exception:
app_log.exception('Exception in callback %r for %r', cb, self)
self._callbacks = None
tornado.gen.coroutine装饰器
tornado的异步操作是通过tornado.gen.coroutine装饰器来实现的,该装饰器的源码:
def coroutine(func, replace_callback=True):
return _make_coroutine_wrapper(func, replace_callback=True)
def _make_coroutine_wrapper(func, replace_callback):
@functools.wraps(func)
def wrapper(*args, **kwargs):
'''
大体过程:
future = TracebackFuture()
result = func(*args, **kwargs)
if isinstance(result, GeneratorType):
yielded = next(result)
Runner(result, future, yielded)
return future
'''
future = TracebackFuture() # TracebackFuture = Future
if replace_callback and 'callback' in kwargs:
callback = kwargs.pop('callback')
IOLoop.current().add_future(future, lambda future: callback(future.result()))
try:
result = func(*args, **kwargs) # 执行func,若func中包含yield,则返回一个generator对象
except (Return, StopIteration) as e:
result = _value_from_stopiteration(e)
except Exception:
future.set_exc_info(sys.exc_info())
return future
else:
if isinstance(result, GeneratorType): # 判断其是否为generator对象
try:
orig_stack_contexts = stack_context._state.contexts
yielded = next(result) # 第一次执行
if stack_context._state.contexts is not orig_stack_contexts:
yielded = TracebackFuture()
yielded.set_exception(
stack_context.StackContextInconsistentError(
'stack_context inconsistency (probably caused '
'by yield within a "with StackContext" block)'))
except (StopIteration, Return) as e:
future.set_result(_value_from_stopiteration(e))
except Exception:
future.set_exc_info(sys.exc_info())
else:
Runner(result, future, yielded) # Runner(result, future, yield)
try:
return future
finally:
future = None
future.set_result(result)
return future
return wrapper
tornado.gen.coroutine大体的执行过程:1,首先生成一个Future对象;2,运行被装饰的函数并将结果赋值给result,在这里tornado的异步是基于生成器的,一般情况下result是一个生成器对象;3,yielded = next(result)执行被装饰的函数的第一次yield,将结果赋值给yielded,一般情况下,yielded是一个生成器对象;4,Runner(result, future, yielded);5,return future;
Runner类
Runner类的作用,就是可以自动将异步操作的结果send()到生成器中止的地方。
tornado异步操作是基于生成器实现的,生成器通常有两个常用的方法:send()和next()。
很多情况下会有生成器嵌套的情况,比如当A生成器yield B生成器时,分两步走:1,首先执行A到yield B的地方,中止A执行B生成器;2,当B执行完成之后,我们将B生成器结果send()到A中止的地方,A继续执行。
Runner类主要就是来做这些工作的,也就是控制生成器的执行和中止,并在合适的情况下将B生成器的结果send给A生成器。我们来看一个简单的例子:
def run():
print('start running')
yield 2 # 跑步用时2小时
def eat():
print('start eating')
yield 1 # 吃饭用时1小时
def time():
run_time = yield run()
eat_time = yield eat()
print(run_time+eat_time)
def Runner(gen):
r = next(gen)
return r
t = time()
try:
action = t.send(Runner(next(t)))
t.send(Runner(action))
except StopIteration:
pass
在Runner类中主要有三个方法:init, handle_yield, run。
__init__方法
__init__里面主要执行一些初始化的操作,主要就是最后的两句:
def __init__(self, gen, result_future, first_yielded):
self.gen = gen # 一个generator对象
self.result_future = result_future # 一个Future对象
self.future = _null_future # 一个刚初始化的Future对象 _null_future = Future(); _null_future.set_result(None)
self.yield_point = None
self.pending_callbacks = None
self.results = None
self.running = False
self.finished = False
self.had_exception = False
self.io_loop = IOLoop.current()
self.stack_context_deactivate = None
if self.handle_yield(first_yielded):
self.run()
handle_yield方法
这个方法主要是用来处理yield返回的对象的。首先,我们假设yielded是一个Future对象(大部分情况都是),这样的话代码可以简略看作:
def handle_yield(self, yielded):
self.future = convert_yielded(yielded) # 如果yielded是Future对象则原样返回
if not self.future.done() or self.future is moment: # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
return False
return True
这段代码主要处理了分为三个步骤:1,首先是解析出self.future;2,然后判断self.future对象时候被done,如果没有就为其添加回调函数,这个回调函数会执行self.run();3,返回self.future对象是否被done。
run方法
这个方法实际上就是一个循环,不停的执行生成器的send()方法,发送的值就是yielded的result。简化后代码如下:
def run(self):
"""Starts or resumes the generator, running until it reaches a
yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
"""
try:
self.running = True
while True:
future = self.future
if not future.done():
return
self.future = None # 清空self.future
value = future.result() # 获取future对象的结果
try:
yielded = self.gen.send(value) # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
except (StopIteration, Return) as e:
self.finished = True
self.future = _null_future
self.result_future.set_result(_value_from_stopiteration(e))
self.result_future = None
self._deactivate_stack_context()
return
if not self.handle_yield(yielded): # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
return
finally:
self.running = False
总结
1,每一个Future对应一个异步操作。
2,该Future对象可以添加回调函数,当该异步操作完成之后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数。
3,凡是使用了coroutine装饰器的生成器函数都会返回一个Future对象,同时不断为该生成器每一次运行send()或者next()的返回结果yielded以及Future对象运行Runner()。
4,Runner()会对生成器不断的调用send()或者next()操作。具体的操作:将上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send到生成器中,不断的循环该操作,直到产生StopIteration或者Return异常(就是生成器结束时产生的异常),这时会为该生成器对应的Future对象set_result。
tornado的异步是基于生成器的,通过yield关键字暂停执行,也通过生成器的send或者next方法恢复执行操作,同时使用send向生成器传递值。
每一个Future对象对应一个异步操作,我们可以为该对象添加很多回调函数,当异步操作完成后,执行set_done或者set_result就可以执行相关的回调函数。
而Runner()就是整个循环过程对象,它不停的将生成器所yield的每一个future对象的结果send回生成器,当生成器执行结束,它会进行最后的包装,对该生成器的对应的Future对象执行set_result操作。