python异步IO(asyncio)协程实战

    xiaoxiao2022-07-08  257

    1.概念描述:

        asyncio 是用来编写并发代码的库,使用async/await语法。

        asyncio 被用作多个提供高性能 Python 异步框架的基础,包括网络和网站服务,数据库连接库,分布式任务队列等等。

        asyncio 往往是构建 IO 密集型和高层级 结构化 网络代码的最佳选择

        event_loop 事件循环:程序开启一个无限的循环,程序员会把一些函数注册到事件循环上。当满足事件发生的时候,调用相应的协程函数。

        coroutine 协程:协程对象,指一个使用async关键字定义的函数,它的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环,由事件循环调用。

        task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程进一步封装,其中包含任务的各种状态。

        future: 代表将来执行或没有执行的任务的结果。它和task上没有本质的区别

        async定义一个协程,await用于挂起阻塞的异步调用接口。

    2.定义一个协程并实例化

    async def do_some_work(x):   # 定义一个协程    print('Waiting: ', x)    return 'Done after {}s'.format(x)

    coroutine = do_some_work(2)  # 新建一个协程实例

    3.创建一个task

        task = asyncio.ensure_future(coroutine)

        task = loop.create_task(coroutine)

        asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task

    4.定义并绑定回调

    # def callback(future):  # 定义一个回调函数,不传参数#     print('Callback: ', future.result())  # 打印协程的返回结果def callback(t, future):  # 定义回调函数,接收一个参数和协程实例对象    print('Callback:', t, future.result())  # 传入的参数和协程实例的执行结果

     

    #task.add_done_callback(callback)  # 添加并绑定回调函数task.add_done_callback(functools.partial(callback, 2))  # 添加回调函数并在回调函数中传入参数

     

    程序执行实例:

    import time import asyncio import functools now = lambda: time.time() async def do_some_work(x):   # 定义一个协程     print('Waiting: ', x)     return 'Done after {}s'.format(x) # def callback(future):  # 定义一个回调函数,不传参数 #     print('Callback: ', future.result())  # 打印协程的返回结果 def callback(t, future):  # 定义回调函数,接收一个参数和协程实例对象     print('Callback:', t, future.result())  # 传入的参数和协程实例的执行结果 start = now() coroutine = do_some_work(2)  # 新建一个协程实例 loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine)  # asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)都可以创建一个task #task.add_done_callback(callback)  # 添加回调函数 task.add_done_callback(functools.partial(callback, 2))  # 添加回调函数并在回调函数中传入参数 loop.run_until_complete(task)   # 将协程实例注册到事件循环,并启动事件循环 print('Task ret: {}'.format(task.result()))  # 获取协程实例运行的返回结果 print('TIME: ', now() - start)

     

    5.阻塞和await

        使用async可以定义协程对象,使用await可以针对耗时的操作进行挂起,就像生成器里的yield一样,函数让出控制权

    协程遇到await,事件循环将会挂起该协程,执行别的协程,直到其他的协程也挂起或者执行完毕,再进行下一个协程的执行

    耗时的操作一般是一些IO操作,例如网络请求,文件读取等。

        我们使用asyncio.sleep函数来模拟IO操作,协程的目的也是让这些IO操作异步化。

    程序示例:

    import asyncio import time now = lambda: time.time() async def do_some_work(x):     print('run in task1: ', x)     await asyncio.sleep(x)  # 协程遇到await,事件循环将会挂起该协程,执行别的协程work2,sleep 模拟耗时操作     print("run in task1 end")     return 'Done after {}s'.format(x) async def do_some_work2(x):     print('run in task2: ', x)     return 'Done2 after {}s'.format(x) start = now() coroutine = do_some_work(1) coroutine2 = do_some_work2(2) loop = asyncio.get_event_loop() task = asyncio.ensure_future(coroutine) task2 = asyncio.ensure_future(coroutine2) loop.run_until_complete(task) loop.run_until_complete(task2) print('Task result: ', task.result()) print('Task result: ', task2.result()) print('TIME: ', now() - start)

     

    执行结果

    run in task1:  2

    run in task2:  2

    run in task1 end

    Task ret:  Done after 2s

    Task ret:  Done2 after 2s

    TIME:  2.0031144618988037

     

    6.asyncio实现并发

        asyncio实现并发,就需要多个协程来完成任务,每当有任务阻塞的时候就await,然后其他协程继续工作。创建多个协程的列表,然后将这些协程注册到事件循环中

    程序示例:

    import time import asyncio now = lambda: time.time()

    async def do_some_work(x):     print('Waiting: ', x)     await asyncio.sleep(x)     return 'Done after {}s'.format(x) start = now() coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [     asyncio.ensure_future(coroutine1),     asyncio.ensure_future(coroutine2),     asyncio.ensure_future(coroutine3) ] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks:     print('Task ret: ', task.result()) print('TIME: ', now() - start)

    执行结果:

    Waiting:  1

    Waiting:  2

    Waiting:  4

    Task ret:  Done after 1s

    Task ret:  Done after 2s

    Task ret:  Done after 4s

    TIME:  4.006228923797607

        结论分析:总时间为4s左右。4s的阻塞时间,足够前面两个协程执行完毕。如果是同步顺序的任务,那么至少需要7s,所有本程序中我们使用了aysncio实现了并发

    7.协程嵌套

    import asyncio import time import asyncio now = lambda: time.time() async def do_some_work(x):     print('Waiting: ', x)     await asyncio.sleep(x)     return 'Done after {}s'.format(x) async def main():     coroutine1 = do_some_work(1)     coroutine2 = do_some_work(2)     coroutine3 = do_some_work(4)     tasks = [         asyncio.ensure_future(coroutine1),         asyncio.ensure_future(coroutine2),         asyncio.ensure_future(coroutine3)     ]     dones, pendings = await asyncio.wait(tasks)     for task in dones:         print('Task ret: ', task.result()) start = now() loop = asyncio.get_event_loop() loop.run_until_complete(main()) print('TIME: ', now() - start)

    执行结果:

    Waiting:  1

    Waiting:  2

    Waiting:  4

    Task ret:  Done after 2s

    Task ret:  Done after 4s

    Task ret:  Done after 1s

    TIME:  3.9912283420562744

     

    8.协程停止

        上面见识了协程的几种常用的用法,都是协程围绕着事件循环进行的操作。future对象有几个状态:

    PendingRunningDoneCancelled

        创建future的时候,task为pending,事件循环调用执行的时候当然就是running,调用完毕自然就是done,如果需要停止事件循环,就需要先把task取消Cancelled

        程序实例:

    import asyncio import time now = lambda: time.time() async def do_some_work(x):     print('Waiting: ', x)     await asyncio.sleep(x)     return 'Done after {}s'.format(x) coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(2) tasks = [     asyncio.ensure_future(coroutine1),     asyncio.ensure_future(coroutine2),     asyncio.ensure_future(coroutine3) ] start = now() loop = asyncio.get_event_loop() try:     loop.run_until_complete(asyncio.wait(tasks)) except KeyboardInterrupt as e:     print(asyncio.Task.all_tasks())     for task in asyncio.Task.all_tasks():         print(task.cancel())     loop.stop()     loop.run_forever() finally:     loop.close() print('TIME: ', now() - start)

        启动事件循环之后,马上ctrl+c,会触发run_until_complete的执行异常 KeyBorardInterrupt,然后通过循环asyncio.Task取消future

        执行结果:

     

    参考博文:http://python.jobbole.com/87310/

    最新回复(0)