asyncio 是用来编写并发代码的库,使用async/await语法。
asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。
asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择
event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。
coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。
future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别
async定义一个协程,await用于挂起阻塞的异步调用接口。
async def do_some_work(x): # 定义一个协程 print('Waiting: ', x) return 'Done after {}s'.format(x)
coroutine = do_some_work(2) # 新建一个协程实例
task = asyncio.ensure_future(coroutine)
task = loop.create_task(coroutine)
asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task
# def callback(future): # 定义一个回调函数,不传参数# print('Callback: ', future.result()) # 打印协程的返回结果def callback(t, future): # 定义回调函数,接收一个参数和协程实例对象 print('Callback:', t, future.result()) # 传入的参数和协程实例的执行结果
#task.add_done_callback(callback) # 添加并绑定回调函数task.add_done_callback(functools.partial(callback, 2)) # 添加回调函数并在回调函数中传入参数
程序执行实例:
import time import asyncio import functools now = lambda: time.time() async def do_some_work(x): # 定义一个协程 print('Waiting: ', x) return 'Done after {}s'.format(x) # def callback(future): # 定义一个回调函数,不传参数 # print('Callback: ', future.result()) # 打印协程的返回结果 def callback(t, future): # 定义回调函数,接收一个参数和协程实例对象 print('Callback:', t, future.result()) # 传入的参数和协程实例的执行结果 start = now() coroutine = do_some_work(2) # 新建一个协程实例 loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) # asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task #task.add_done_callback(callback) # 添加回调函数 task.add_done_callback(functools.partial(callback, 2)) # 添加回调函数并在回调函数中传入参数 loop.run_until_complete(task) # 将协程实例注册到事件循环,并启动事件循环 print('Task ret: {}'.format(task.result())) # 获取协程实例运行的返回结果 print('TIME: ', now() - start)
使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权
协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行
耗时的操作一般是一些IO操作,例如网络请求,文件读取等。
我们使用asyncio.sleep函数来模拟IO操作,协程的目的也是让这些IO操作异步化。
程序示例:
import asyncio import time now = lambda: time.time() async def do_some_work(x): print('run in task1: ', x) await asyncio.sleep(x) # 协程遇到await,事件循环将会挂起该协程,执行别的协程work2,sleep 模拟耗时操作 print("run in task1 end") return 'Done after {}s'.format(x) async def do_some_work2(x): print('run in task2: ', x) return 'Done2 after {}s'.format(x) start = now() coroutine = do_some_work(1) coroutine2 = do_some_work2(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task2 = asyncio.ensure_future(coroutine2) loop.run_until_complete(task) loop.run_until_complete(task2) print('Task result: ', task.result()) print('Task result: ', task2.result()) print('TIME: ', now() - start)
执行结果:
run in task1: 2
run in task2: 2
run in task1 end
Task ret: Done after 2s
Task ret: Done2 after 2s
TIME: 2.0031144618988037
asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中
程序示例:
import time import asyncio now = lambda: time.time()
async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) start = now() coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task ret: ', task.result()) print('TIME: ', now() - start)
执行结果:
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
TIME: 4.006228923797607
结论分析:总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s,所有本程序中我们使用了aysncio实现了并发
import asyncio import time import asyncio now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] dones, pendings = await asyncio.wait(tasks) for task in dones: print('Task ret: ', task.result()) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start)
执行结果:
Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 2s
Task ret: Done after 4s
Task ret: Done after 1s
TIME: 3.9912283420562744
上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:
PendingRunningDoneCancelled创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消Cancelled
程序实例:
import asyncio import time now = lambda: time.time() async def do_some_work(x): print('Waiting: ', x) await asyncio.sleep(x) return 'Done after {}s'.format(x) coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] start = now() loop = asyncio.get_event_loop() try: loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e: print(asyncio.Task.all_tasks()) for task in asyncio.Task.all_tasks(): print(task.cancel()) loop.stop() loop.run_forever() finally: loop.close() print('TIME: ', now() - start)
启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt,然后通过循环asyncio.Task取消future
执行结果:
参考博文:http://python.jobbole.com/87310/
