什么是异步

同步 IO

CPU 速度远快于磁盘、网络等 IO 操作。在单线程中,遇到 IO 时需等待完成才能继续,导致其他代码无法执行。这种模式称为同步 IO。

为解决 CPU 与 IO 速度不匹配,可用多进程、多线程或异步 IO。

异步 IO

异步 IO 模式下,发出 IO 指令后不等待结果,继续执行其他代码。IO 完成时通知 CPU 处理。

异步 IO 需要消息循环,主线程重复“读取消息 - 处理消息”。

协程

协程(coroutine)类似微线程,可通过 yield 调用其他协程。与函数调用不同,协程间平等,可按需切换。

以下是协程的生产者 - 消费者模型示例:

def consumer():
    """消费者"""
    r = ""
    while True:
        n = yield r
        if not n:
            return
        print("[CONSUMER] Consuming %s ..." % n)
        r = "200 OK"

def producer(c):
    """生产者"""
    c.send(None)
    n = 0
    while n < 5:
        n = n + 1
        print('[PRODUCER] Producing %s...' % n)
        r = c.send(n)
        print('[PRODUCER] Consumer return: %s' % r)
    c.close()

c = consumer()
producer(c)

输出:

[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK

异步概念误区

  1. 异步不提升单个 IO 任务响应速度。它提高 CPU 利用率和整体效率,但单个任务仍需等待 IO 结果。
  2. 同步程序用协程封装不变成异步。需底层 IO 支持,在 IO 时让渡执行权,否则仍阻塞,如 SQLAlchemy。

Python3 的异步实现

asyncio

Python 3.4 引入的 asyncio 标准库支持异步 IO。

编程模型基于消息循环:获取 EventLoop,将协程放入执行。

import threading
import asyncio

@asyncio.coroutine
def hello():
    print('Hello world! (%s)' % threading.currentThread())
    # 模拟耗时 1 秒 IO
    r = yield from asyncio.sleep(1)
    print('Hello again! (%s)' % threading.currentThread())

# 获取 event_loop
loop = asyncio.get_event_loop()
# 执行协程
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

执行流程:第一个 hello 打印后 yield from 切换到第二个,1 秒后继续。

输出(同一线程并发):

Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暂停约 1 秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)

async/await

Python 3.5 引入 async/await 简化语法:

  • @asyncio.coroutineasync
  • yield fromawait

示例:

async def hello():
    print("Hello world!")
    r = await asyncio.sleep(1)
    print("Hello again!")

Tornado 的异步实现

Tornado 是异步网络框架,能处理大量并发连接。

在 handler 中需网络请求时,用异步方式提高效率。

异步 handler

两种方式:tornado.web.asynchronoustornado.gen.coroutine(推荐)。

tornado.web.asynchronous

用装饰器修饰 handler,调用异步代码并用回调处理。

import tornado.web
import tornado.httpclient

class MyRequestHandler(tornado.web.RequestHandler):
    @tornado.web.asynchronous
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        http_client.fetch("https://google.com/", self._on_download)

    def _on_download(self, response):
        self.write("Downloaded!")
        self.finish()

tornado.gen.coroutine

用装饰器,用 yield 调用异步代码。

import tornado.gen
import tornado.web
import tornado.httpclient

class MyRequestHandler(tornado.web.RequestHandler):
    @tornado.gen.coroutine
    def get(self):
        http_client = tornado.httpclient.AsyncHTTPClient()
        response = yield http_client.fetch("https://google.com/")
        self.write(response.body)

异步网络请求

用 Tornado 的 AsyncHTTPClient,或封装 Requests 为异步。

Requests + ThreadPoolExecutor

用 ThreadPoolExecutor 将同步 Requests 转为异步。

封装:

from concurrent.futures import ThreadPoolExecutor
import tornado.gen
import requests

class AsyncRequests(object):
    thread_pool = ThreadPoolExecutor(4)

    @classmethod
    @tornado.gen.coroutine
    def aget(cls, *args, **kw):
        resp = yield cls.thread_pool.submit(requests.get, *args, **kw)
        return resp

在 handler 中调用:

import tornado.web
import tornado.gen

class StatisticPlatformData(tornado.web.RequestHandler):
    @tornado.web.authenticated
    @tornado.gen.coroutine
    def get(self):
        url = "https://httpbin.org/get"
        resp = yield AsyncRequests.aget(url)
        jsondata = resp.json()
        self.write(jsondata)

多异步请求并发

串行执行慢,并发用列表或字典 yield。

import tornado.gen

@tornado.gen.coroutine
def get(self):
    http_client = AsyncHTTPClient()

    # 列表方式
    response1, response2 = yield [http_client.fetch(url1),
                                  http_client.fetch(url2)]

    # 字典方式
    response_dict = yield dict(response3=http_client.fetch(url3),
                               response4=http_client.fetch(url4))
    response3 = response_dict['response3']
    response4 = response_dict['response4']

参考链接