个人博客

Tornado异步原理

Tornado使用了单进程+协程+IO多路复用的机制,解决了C10K中因为线程过多的上下文切换而导致的CPU资源浪费。

Tornado异步的实现,就是通过tornado.gen生成器来来实现,从而提高服务器的并发能力。

主要就是使用到tornado.gen.coroutine装饰器,异步的实现思路:生成器中通过yield语句可以使函数暂停执行,而send()方法可以恢复函数的执行。tornado将那些异步操作放置在yield语句之后,当这些异步操作执行完成之后,tornado会send()异步执行结果到生成器中恢复后续执行操作。

在tornado中大多数的异步操作都是返回一个Future对象。yield Future对象,会返回该异步操作结果,例如:ret = yield future_obj,当future_obj对应的异步操作完成之后,就会自动将异步结果赋值到ret中。

Future对象

Future对象可以看作一个异步操作的占位符,这个对象包含很多属性,包括_result和_callbacks,分别用来存储异步操作结果和回调函数。还包含了很多方法,比如添加回调函数,设置异步操作结果等。当异步操作完成之后,该对象的set_done会被调用,然后遍历并运行_callbacks中的回调函数。

下面是一个Future对象的简化版代码:

class Future(object):
    '''
        Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数
        如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数
    '''
    def __init__(self):
        self._result = None    # 执行的结果
        self._callbacks = []    # 用来保存该future对象的回调函数

    def result(self, timeout=None):
        # 如果操作成功,返回结果。如果失败则抛出异常
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def add_done_callback(self, fn):
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        self._result = result
        self._set_done()

    def _set_done(self):
        # 执行结束(成功)后的操作。
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('Exception in callback %r for %r', cb, self)
        self._callbacks = None

tornado.gen.coroutine装饰器

tornado的异步操作是通过tornado.gen.coroutine装饰器来实现的,该装饰器的源码:

def coroutine(func, replace_callback=True):
    return _make_coroutine_wrapper(func, replace_callback=True)

def _make_coroutine_wrapper(func, replace_callback):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        '''
            大体过程:
            future = TracebackFuture()  
            result = func(*args, **kwargs)
            if isinstance(result, GeneratorType):
                yielded = next(result)
                Runner(result, future, yielded)
            return future
        '''
        future = TracebackFuture()                   # TracebackFuture = Future

        if replace_callback and 'callback' in kwargs:
            callback = kwargs.pop('callback')
            IOLoop.current().add_future(future, lambda future: callback(future.result()))

        try:
            result = func(*args, **kwargs)           # 执行func,若func中包含yield,则返回一个generator对象
        except (Return, StopIteration) as e:
            result = _value_from_stopiteration(e)
        except Exception:
            future.set_exc_info(sys.exc_info())
            return future
        else:
            if isinstance(result, GeneratorType):      # 判断其是否为generator对象
                try:
                    orig_stack_contexts = stack_context._state.contexts
                    yielded = next(result)            # 第一次执行
                    if stack_context._state.contexts is not orig_stack_contexts:
                        yielded = TracebackFuture()
                        yielded.set_exception(
                            stack_context.StackContextInconsistentError(
                                'stack_context inconsistency (probably caused '
                                'by yield within a "with StackContext" block)'))
                except (StopIteration, Return) as e:
                    future.set_result(_value_from_stopiteration(e))
                except Exception:
                    future.set_exc_info(sys.exc_info())
                else:
                    Runner(result, future, yielded)  # Runner(result, future, yield)
                try:
                    return future            
                finally:
                    future = None
        future.set_result(result)
        return future
    return wrapper

tornado.gen.coroutine大体的执行过程:1,首先生成一个Future对象;2,运行被装饰的函数并将结果赋值给result,在这里tornado的异步是基于生成器的,一般情况下result是一个生成器对象;3,yielded = next(result)执行被装饰的函数的第一次yield,将结果赋值给yielded,一般情况下,yielded是一个生成器对象;4,Runner(result, future, yielded);5,return future;

Runner类

Runner类的作用,就是可以自动将异步操作的结果send()到生成器中止的地方。

tornado异步操作是基于生成器实现的,生成器通常有两个常用的方法:send()和next()。

很多情况下会有生成器嵌套的情况,比如当A生成器yield B生成器时,分两步走:1,首先执行A到yield B的地方,中止A执行B生成器;2,当B执行完成之后,我们将B生成器结果send()到A中止的地方,A继续执行。

Runner类主要就是来做这些工作的,也就是控制生成器的执行和中止,并在合适的情况下将B生成器的结果send给A生成器。我们来看一个简单的例子:

def run():
    print('start running')
    yield 2     # 跑步用时2小时

def eat():
    print('start eating')
    yield 1     # 吃饭用时1小时

def time():
    run_time = yield run()
    eat_time = yield eat()
    print(run_time+eat_time)

def Runner(gen):
    r = next(gen)
    return r

t = time()
try:
    action = t.send(Runner(next(t)))
    t.send(Runner(action))
except StopIteration:
    pass

在Runner类中主要有三个方法:init, handle_yield, run。

__init__方法

__init__里面主要执行一些初始化的操作,主要就是最后的两句:

def __init__(self, gen, result_future, first_yielded):
    self.gen = gen                        # 一个generator对象
    self.result_future = result_future    # 一个Future对象
    self.future = _null_future            # 一个刚初始化的Future对象  _null_future = Future(); _null_future.set_result(None)
    self.yield_point = None
    self.pending_callbacks = None
    self.results = None
    self.running = False
    self.finished = False
    self.had_exception = False
    self.io_loop = IOLoop.current()
    self.stack_context_deactivate = None
    if self.handle_yield(first_yielded):
        self.run()

handle_yield方法

这个方法主要是用来处理yield返回的对象的。首先,我们假设yielded是一个Future对象(大部分情况都是),这样的话代码可以简略看作:

def handle_yield(self, yielded):
    self.future = convert_yielded(yielded)                         # 如果yielded是Future对象则原样返回
    if not self.future.done() or self.future is moment:            # moment是tornado初始化时就建立的一个Future对象,且被set_result(None)
        self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback
        return False
    return True

这段代码主要处理了分为三个步骤:1,首先是解析出self.future;2,然后判断self.future对象时候被done,如果没有就为其添加回调函数,这个回调函数会执行self.run();3,返回self.future对象是否被done。

run方法

这个方法实际上就是一个循环,不停的执行生成器的send()方法,发送的值就是yielded的result。简化后代码如下:

def run(self):
    """Starts or resumes the generator, running until it reaches a
    yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done
    """
    try:
        self.running = True 
        while True:
            future = self.future  
            if not future.done():
                return
            self.future = None      # 清空self.future
            value = future.result()   # 获取future对象的结果
            try:    
                yielded = self.gen.send(value)  # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象)
            except (StopIteration, Return) as e:
                self.finished = True
                self.future = _null_future
                self.result_future.set_result(_value_from_stopiteration(e))
                self.result_future = None
                self._deactivate_stack_context()
                return
            if not self.handle_yield(yielded):  # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环
                return
    finally:
        self.running = False

总结

1,每一个Future对应一个异步操作。

2,该Future对象可以添加回调函数,当该异步操作完成之后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数。

3,凡是使用了coroutine装饰器的生成器函数都会返回一个Future对象,同时不断为该生成器每一次运行send()或者next()的返回结果yielded以及Future对象运行Runner()。

4,Runner()会对生成器不断的调用send()或者next()操作。具体的操作:将上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send到生成器中,不断的循环该操作,直到产生StopIteration或者Return异常(就是生成器结束时产生的异常),这时会为该生成器对应的Future对象set_result。

tornado的异步是基于生成器的,通过yield关键字暂停执行,也通过生成器的send或者next方法恢复执行操作,同时使用send向生成器传递值。

每一个Future对象对应一个异步操作,我们可以为该对象添加很多回调函数,当异步操作完成后,执行set_done或者set_result就可以执行相关的回调函数。

而Runner()就是整个循环过程对象,它不停的将生成器所yield的每一个future对象的结果send回生成器,当生成器执行结束,它会进行最后的包装,对该生成器的对应的Future对象执行set_result操作。

相关标签
回到顶部