什么是异步
同步 IO
CPU 速度远快于磁盘、网络等 IO 操作。在单线程中,遇到 IO 时需等待完成才能继续,导致其他代码无法执行。这种模式称为同步 IO。
为解决 CPU 与 IO 速度不匹配,可用多进程、多线程或异步 IO。
异步 IO
异步 IO 模式下,发出 IO 指令后不等待结果,继续执行其他代码。IO 完成时通知 CPU 处理。
异步 IO 需要消息循环,主线程重复“读取消息 - 处理消息”。
协程
协程(coroutine)类似微线程,可通过 yield 调用其他协程。与函数调用不同,协程间平等,可按需切换。
以下是协程的生产者 - 消费者模型示例:
def consumer():
"""消费者"""
r = ""
while True:
n = yield r
if not n:
return
print("[CONSUMER] Consuming %s ..." % n)
r = "200 OK"
def producer(c):
"""生产者"""
c.send(None)
n = 0
while n < 5:
n = n + 1
print('[PRODUCER] Producing %s...' % n)
r = c.send(n)
print('[PRODUCER] Consumer return: %s' % r)
c.close()
c = consumer()
producer(c)
输出:
[PRODUCER] Producing 1...
[CONSUMER] Consuming 1...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 2...
[CONSUMER] Consuming 2...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 3...
[CONSUMER] Consuming 3...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 4...
[CONSUMER] Consuming 4...
[PRODUCER] Consumer return: 200 OK
[PRODUCER] Producing 5...
[CONSUMER] Consuming 5...
[PRODUCER] Consumer return: 200 OK
异步概念误区
- 异步不提升单个 IO 任务响应速度。它提高 CPU 利用率和整体效率,但单个任务仍需等待 IO 结果。
- 同步程序用协程封装不变成异步。需底层 IO 支持,在 IO 时让渡执行权,否则仍阻塞,如 SQLAlchemy。
Python3 的异步实现
asyncio
Python 3.4 引入的 asyncio 标准库支持异步 IO。
编程模型基于消息循环:获取 EventLoop,将协程放入执行。
import threading
import asyncio
@asyncio.coroutine
def hello():
print('Hello world! (%s)' % threading.currentThread())
# 模拟耗时 1 秒 IO
r = yield from asyncio.sleep(1)
print('Hello again! (%s)' % threading.currentThread())
# 获取 event_loop
loop = asyncio.get_event_loop()
# 执行协程
tasks = [hello(), hello()]
loop.run_until_complete(asyncio.wait(tasks))
loop.close()
执行流程:第一个 hello 打印后 yield from 切换到第二个,1 秒后继续。
输出(同一线程并发):
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
Hello world! (<_MainThread(MainThread, started 140735195337472)>)
(暂停约 1 秒)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
Hello again! (<_MainThread(MainThread, started 140735195337472)>)
async/await
Python 3.5 引入 async/await 简化语法:
@asyncio.coroutine
→async
yield from
→await
示例:
async def hello():
print("Hello world!")
r = await asyncio.sleep(1)
print("Hello again!")
Tornado 的异步实现
Tornado 是异步网络框架,能处理大量并发连接。
在 handler 中需网络请求时,用异步方式提高效率。
异步 handler
两种方式:tornado.web.asynchronous
和 tornado.gen.coroutine
(推荐)。
tornado.web.asynchronous
用装饰器修饰 handler,调用异步代码并用回调处理。
import tornado.web
import tornado.httpclient
class MyRequestHandler(tornado.web.RequestHandler):
@tornado.web.asynchronous
def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
http_client.fetch("https://google.com/", self._on_download)
def _on_download(self, response):
self.write("Downloaded!")
self.finish()
tornado.gen.coroutine
用装饰器,用 yield 调用异步代码。
import tornado.gen
import tornado.web
import tornado.httpclient
class MyRequestHandler(tornado.web.RequestHandler):
@tornado.gen.coroutine
def get(self):
http_client = tornado.httpclient.AsyncHTTPClient()
response = yield http_client.fetch("https://google.com/")
self.write(response.body)
异步网络请求
用 Tornado 的 AsyncHTTPClient,或封装 Requests 为异步。
Requests + ThreadPoolExecutor
用 ThreadPoolExecutor 将同步 Requests 转为异步。
封装:
from concurrent.futures import ThreadPoolExecutor
import tornado.gen
import requests
class AsyncRequests(object):
thread_pool = ThreadPoolExecutor(4)
@classmethod
@tornado.gen.coroutine
def aget(cls, *args, **kw):
resp = yield cls.thread_pool.submit(requests.get, *args, **kw)
return resp
在 handler 中调用:
import tornado.web
import tornado.gen
class StatisticPlatformData(tornado.web.RequestHandler):
@tornado.web.authenticated
@tornado.gen.coroutine
def get(self):
url = "https://httpbin.org/get"
resp = yield AsyncRequests.aget(url)
jsondata = resp.json()
self.write(jsondata)
多异步请求并发
串行执行慢,并发用列表或字典 yield。
import tornado.gen
@tornado.gen.coroutine
def get(self):
http_client = AsyncHTTPClient()
# 列表方式
response1, response2 = yield [http_client.fetch(url1),
http_client.fetch(url2)]
# 字典方式
response_dict = yield dict(response3=http_client.fetch(url3),
response4=http_client.fetch(url4))
response3 = response_dict['response3']
response4 = response_dict['response4']