python协程
创始人
2025-05-30 11:37:46
0

文章目录

  • 一、前言
  • 二、介绍
    • 2.1 为什么要用协程
    • 2.2 协程的效率受到哪些限制
    • 2.3 协程的意义
  • 三、协程示例
  • 四、事件循环
  • 五、协程声明
  • 六、await
  • 七、Task对象
    • 7.1 单任务
    • 7.2 多任务
    • 7.3 多任务优化
  • 八、asyncio.future对象
    • 8.1 语法
    • 8.2 示例
  • 九、concurrent.futures.Future 对象
  • 十、实现线程协程和进程协程
  • 十一、爬虫示例:asyncio + 不支持异步的模块
    • 11.1 loop.run_in_executor
    • 11.2 协程
    • 11.3 多线程
    • 11.4 结论
  • 十三、异步迭代器
  • 十四、异步上下文管理器
  • 十五、uvloop
  • 十六、 异步操作Redis
    • 16.1 单任务
    • 16.2 多任务
  • 十七、异步操作MySQL
    • 17.1 单任务
    • 17.2 多任务
  • 十八、FastAPI
  • 十九、爬虫

一、前言

其实这块东西有点多,从iterator,iterable,yield,yield from,再到最终的async,await,asyncio
经历了python2.x -> 3.x的版本迭代,一篇文章也很难讲清楚,本文章主要说明在实际开发中怎么使用协程

支持协程所有指令的python版本:python >= 3.7
支持协程的python web框架:Django 3.x,Flask 2.x,fastAPI
支持协程的python 库:想实现什么功能建议直接去Google搜索

二、介绍

2.1 为什么要用协程

线程遇到IO阻塞是由系统调度的;协程是由用户调度的,更快更省资源
线程启动的时候需要定义线程数;协程则不需要,它能运行更多的任务

2.2 协程的效率受到哪些限制

协程是在线程内部运行,所以也受到GIL全局解释器锁的限制,无法同时利用多核CPU
协程需要有对应的第三方异步库的支持,否则得自己去使用协程的方法实现
协程是为了运行多任务的存在,如果使用协程只是执行单个任务,那它其实并不能提升效率
协程相比线程池,实现更复杂一点

2.3 协程的意义

通过一个线程利用其IO等待时间去做更多的事情

三、协程示例

这里为什么不用requests,因为python的requests库不支持协程,就像上面讲的要使用协程,也得需要web框架支持。

import asyncio
import aiohttpasync def fetch(session, url):print("发送请求")async with session.get(url, verify_ssl=False) as response:content = await response.content.read()file_name = url.split('_')[-1]with open(file_name, 'wb') as f:f.write(content)async def main():async with aiohttp.ClientSession() as session:url_list = ['https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_65.png','https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_66.png','https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_67.png']tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())

四、事件循环

简单点说,事件循环可以理解为统一调度协程中一批任务的队列,将这协程任务丢到这个队列,进行调度;至于什么时候调度,取决于await语句;而async语句声明这是一个协程任务

import asynciotasks = list()
# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到列表
loop.run_until_complete(tasks)

五、协程声明

协程函数: async def foo()
协程对象: 执行协程函数得到的对象,如: foo()
特别注意: 当协程函数func()后,内部代码是不会执行的,只是得到了一个协程对象(这是和普通函数的区别)
如果想要运行协程对象,则必须将其交给事件循环来处理

python3.7 + 的写法这里为什么没有定义事件循环? 因为asyncio.run(result) 在执行的时候,会默认创建一个事件循环;简化了之前版本的写法,所以协程使用是很简单的

async def func():                    # func为协程函数print('开始执行')result = func()                      # result为协程对象# python 3.7之前的写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result)    # 让事件循环去执行协程对象# python 3.7+写法
asyncio.run(result)

六、await

await 后面定义可等待的东西 (可以是:协程对象,Future,Task对象);这里的可等待东西就是IO等待
await 就是等待可等待的对象的值返回以后再放下走,在await的同时,从当前的协程任务切换到另一个就绪的协程任务
result = await 则表示result是await后面的指令返回结果,也就是一个return的对象

import asyncio
async def func():print('过来呀')response = await asyncio.sleep(2)print('已结束, ', response)asyncio.run(func())
import asyncioasync def others():print('start')await asyncio.sleep(2)print('end')return '返回'async def func():print('开始执行协程函数内部代码')# 当遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)response = await others()print('结束执行协程函数内部代码 ', response)asyncio.run(func())

七、Task对象

前面的示例都是将一个协程函数添加到事件循环中,这其实没什么意义,因为使用协程本质上来讲是运行多个批量的任务。Task对象就是将一组协程对象;Tasks用于并发调度协程,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协程加入到事件循环中等待被调度执行。

除了使用async.create()函数以外,还可以用低层级的 loop.create_task() 或ensure_future() 函数。不建议手动实例化Task对象

注意:asyncio.create_task() 函数在python3.7中被加入。在python3.7之前,可以改用低层级的 asyncio.ensure_future()函数,python官方建议用户使用高层级的方法

7.1 单任务

下面是使用task对象的简单方法,将一个一个任务添加到事件循环的。

import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'async def main():print('main开始')# 创建协程对象,将协程对象封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)task1 = asyncio.create_task(func())task2 = asyncio.create_task(func())print('main结束')# 当执行某协程遇到IO操作时,会自动化切换执行其他任务# 此处的await是等待相对应的协程全部都执行完毕并获取结果ret1 = await task1ret2 = await task2print(ret1, ret2)asyncio.run(main())

7.2 多任务

下面是列出了一组tasks任务,一次性添加到携程事件循环中

import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'async def main():print('main开始')task_list = [asyncio.create_task(func(), name='t1'),asyncio.create_task(func(), name='t2')]print('main结束')done, pending = await asyncio.wait(task_list, timeout=3)print(done, pending)asyncio.run(main())

7.3 多任务优化

下面是上面方法的简写,使代码更简洁

import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'task_list = [func() for _ in range(5)]
done, pending = asyncio.run(asyncio.wait(task_list))
print('done: ', done)
print('pending: ', pending)

八、asyncio.future对象

Future是一个更低层的接口,用来等待异步的执行结果
Task继承Future对象,Task对象内部await结果的处理基于Future对象来的。

8.1 语法

import asyncioasync def main():# 获取当前运行的事件循环loop = asyncio.get_running_loop()# 创建一个任务(Future对象),这个任务什么都不干future = loop.create_future()# 等待任务最终结果(Future),没有结果则会一直等待下去await futureasyncio.run(main())

8.2 示例

import asyncio
import timeasync def set_after(future):await asyncio.sleep(2)future.set_result('666')async def main():start_time = time.time()# 获取当前事件循环loop = asyncio.get_running_loop()# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束future = loop.create_future()# 创建一个任务(Task对象),绑定了set_after协程对象,内部函数在2s之后,会给future赋值# 即手动设置future任务的最终结果,那么future就可以结束了await loop.create_task(set_after(future))# 等待Future对象获取最终结果,否则一直等下去data = await futureprint(data)print(time.time() - start_time)asyncio.run(main())

九、concurrent.futures.Future 对象

使用线程池、进程池实现异步操作时用到的对象。
线程池:from concurrent.futures import ThreadPoolExecutor
进程池:from concurrent.futures import ProcessPoolExecutor

import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print(value)return valueif __name__ == '__main__':# 创建线程池with ThreadPoolExecutor() as pool:for i in range(12):fut = pool.submit(func, i)print(fut)# 创建进程池with ProcessPoolExecutor() as pool:for i in range(12):fut = pool.submit(func, i)print(fut)

十、实现线程协程和进程协程

以后写代码可能会存在交叉使用
例如:crm项目80%都是基于协程异步编程+MySQL(不支持)【线程、进程做异步编程】
当有的第三方库不支持协程,那么就需要两种结合

import time
import asyncio
import concurrent.futuresdef func1():# 某个耗时操作time.sleep(2)return 'ok'async def main():loop = asyncio.get_running_loop()# 运行默认的loop执行器(默认为ThreadPoolExecutor)# 1. 内部会先调用 ThreadPoolExecutor 的submit方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象# 2. 调用asyncio.wrap_future将concurrent.futures.Future对象包装为asyncio.Future对象# 因为concurrent.futures.Future对象不支持await方法,所以需要包装为 asyncio.Future 对象才能使用fut = loop.run_in_executor(None, func1)result = await futprint('default thread pool', result)# 运行一个线程池with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom thread pool: ', result)# 运行一个进程池with concurrent.futures.ProcessPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom process pool: ', result)if __name__ == '__main__':asyncio.run(main())

十一、爬虫示例:asyncio + 不支持异步的模块

由于requests是不支持协程的,所以为了使用协程提升效率,则可以使用loop.run_in_executor()方法
下面的场景其实在多线程和协程分别执行的时候,效率其实提升不了多少,因为文件很小本身就很快,没有遇到多少阻塞;那在什么情况下会有很大的区别呢,就是文件很大需要时间或者你会在进行sleep等待浏览器程序加载前端页面的时候

11.1 loop.run_in_executor

下面使用loop.run_in_executor()执行

import asyncio
import requestsasync def download_image(url):# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)print('开始下载: ', url)loop = asyncio.get_event_loop()future = loop.run_in_executor(None, requests.get, url)response = await futureprint('下载完成')# 图片保存到本地file_name = f"/Users/yuehua/Downloads/images/{url.split('_')[-1]}"with open(file_name, 'wb') as f_obj:f_obj.write(response.content)if __name__ == '__main__':url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))

11.2 协程

import asyncio
import requests
import timeasync def download_image(url):start_time = time.time()# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)loop = asyncio.get_event_loop()response = await loop.run_in_executor(None, requests.get, url)# 模拟等待浏览器前端页面加载await asyncio.sleep(5)# 图片保存到本地file_name = f"/Users/yuehua/Downloads/images/{url.split('/')[-1]}"with open(file_name, 'wb') as f_obj:f_obj.write(response.content)print(time.time() - start_time)# 用该方式获取result,不过python3.11中将删除该方式
# async def main():
#     url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]
#     tasks = [download_image(url) for url in url_list]
#     done, pending = await asyncio.wait(tasks)
#     print(done)if __name__ == '__main__':url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]tasks = [download_image(url) for url in url_list]asyncio.run(asyncio.wait(tasks))# asyncio.run(main())

11.3 多线程

from concurrent.futures import ThreadPoolExecutor
import requests
import timedef url_save(url):start_time = time.time()response = requests.get(url)file_name = f"/Users/yuehua/Downloads/images/{url.split('/')[-1]}"# 模拟等待浏览器前端页面加载time.sleep(5)with open(file_name, 'wb') as f_obj:f_obj.write(response.content)return (time.time() - start_time)if __name__ == '__main__':start_time = time.time()with ThreadPoolExecutor() as pool:url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]results = pool.map(url_save, tuple(url_list))for i in results:print('运行时间: ', i)print('总运行时间: ', time.time() - start_time)

如果你手动执行过上面线程和协程的代码,你会发现,当任务执行过程中需要等待IO的时候,协程与线程的效率有多不同

11.4 结论

只要任务队列不多,基本上忽略线程和协程上下文切换的时间(协程切换<线程切换),例如我们爬虫的总任务数=sum(需爬虫的账户*每个账户的资源标签)

  1. 单任务运行:时间=单任务时间*任务数
  2. 多线程运行:时间=单任务时间*任务数/线程数 + 线程切换时间
  3. 协程运行: 时间=单任务(异步执行时间) + 单任务(同步执行时间)*任务数量 + 协程切换时间
  4. 协程+多线程运行:时间=单任务(异步执行时间) + 单任务(同步执行时间)*任务数量/线程数量 + 线程切换时间 + 协程切换时间

顺便提一下python中的并行与并发,有时间单独拿出来介绍
多进程:并行运行。当CPU核心数>1,任务是会并行运行的,且可并行运行的任务数量等于核心数
多线程:并发运行。因为全局解释器锁GIL,只有当前正在运行的线程才持有锁,其他线程都被挂起
协程:并发运行。协程是在线程内部运行的,且上下文的切换是由用户控制,且同一时刻只能有一个任务在运行

十三、异步迭代器

什么是异步迭代器
实现了 aiter()和 anext()方法的对象。anext 必须返回一个 awaitable对象。async_for会处理异步迭代器的
anext()方法所返回的可迭代对象,直到其引发一个 StopAsyncIteration 异常。

什么是异步可迭代对象
可在 async_for 语句中被使用的对象。必须通过它的 aiter() 方法返回一个 asynchronousiterator

import asyncioclass Reader:""" 自定义异步迭代器(同时也是异步可迭代对象)"""def __init__(self):self.count = 0async def readline(self):# await asyncio.sleep(1)self.count += 1if self.cont == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn val# 异步迭代器必须由协程函数中执行            
async def func():obj = Reader()async for i in obj:print(i)# asyncio.run执行协程函数
asyncio.run(func())

十四、异步上下文管理器

此种对象通过定义 aenter() 和 aexit()方法来对 async_with 语句中的环境进行控制

import asyncio
import timeclass AsyncContextManager:def __init__(self):self.conn = Noneasync def do_somethins(self):# 异步操作数据库return '嘿嘿操作好了!'async def __aenter__(self):# 异步连接数据库self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):# 异步关闭数据库连接await asyncio.sleep(1)async def func():async with AsyncContextManager() as f:result = await f.do_somethins()print(result)if __name__ == '__main__':asyncio.run(func())

十五、uvloop

uvloop是asyncio事件循环的替代方案。它事件循环效率>默认asyncio的事件循环,性能更高
安装:pip3 install uvloop

fastapi是很火的一个支持协程编程的python web框架,它使用率asgi unicorn,而unicorn内部使用的就是uvloop

import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 编写 asyncio 的代码,与之前的代码一致# 内部的事件循环自动转化为uvlooop
asyncio.run()

十六、 异步操作Redis

在使用python代码操作redis时,连接/操作/断开都是网络IO
安装:pip3 install aioredis

16.1 单任务

import asyncio
import aioredisasync def execute(url):print('建立连接: ', url)# 网络IO操作:创建redis连接redis = await aioredis.from_url(url=url,max_connections=10,decode_responses=True,db=4)await redis.set('my-key', 'my-value')await redis.mset({'key:1': 'value1', 'key:2': 'value2'})val1 = await redis.get('my-key')val2 = await redis.get('key:1')await redis.close()print(val1, val2)url = 'redis://127.0.0.1:6379'
asyncio.run(execute(url))

16.2 多任务

import asyncio
import aioredisasync def execute(url):print('建立连接: ', url)# 网络IO操作:创建redis连接redis = await aioredis.from_url(url=url,max_connections=10,decode_responses=True,db=4)await redis.set('my-key', 'my-value')await redis.mset({'key:1': 'value1', 'key:2': 'value2'})val1 = await redis.get('my-key')val2 = await redis.get('key:1')await redis.close()    print(val1, val2)task_list = [execute('redis://127.0.0.1:6379'),execute('redis://127.0.0.1:6379'),
]
asyncio.run(asyncio.wait(task_list))

十七、异步操作MySQL

安装:pip3 install aiomysql

17.1 单任务

import asyncio
import aiomysqlasync def test_example():conn = await aiomysql.connect(host='127.0.0.1', port=3306,user='root', password='heihei',db='devops_dev')cursor = await conn.cursor()await cursor.execute("SELECT instance_id, name FROM ops_host limit 10")r = await cursor.fetchall()await cursor.close()conn.close()print(r)asyncio.run(test_example())

17.2 多任务

import asyncio
import aiomysqlasync def test_example():conn = await aiomysql.connect(host='127.0.0.1', port=3306,user='root', password='heihei',db='devops_dev')cursor = await conn.cursor()await cursor.execute("SELECT instance_id, name FROM ops_host limit 10")r = await cursor.fetchall()await cursor.close()conn.close()print(r)task_list = [test_example(),test_example()
]asyncio.run(asyncio.wait(task_list))

十八、FastAPI

pip3 install fastapi
pip3 install uvicorn (asgi 支持异步的wsgi,内部基于uvloop)
uvicorn bubu:app --reload

from fastapi import FastAPI
import asyncio
import aioredis
import uvicorn
import timeredis_pool = aioredis.ConnectionPool.from_url(url='redis://127.0.0.1:6379',max_connections=10,decode_responses=True,db=4
)
redis = aioredis.Redis(connection_pool=redis_pool)app = FastAPI()@app.get("/")
async def root():await asyncio.sleep(3)return {"message": "Hello World"}@app.get('/red')
async def red():print('嘿嘿我请求来了')await asyncio.sleep(3)await redis.execute_command('set', 'aa', 'bb')val = await redis.execute_command('get', 'aa')return valif __name__ == '__main__':uvicorn.run('bubu:app', host='127.0.0.1', port=8888, log_level='info')

十九、爬虫

import aiohttp
import asyncioasync def fetch(url):print('发送请求')async with aiohttp.ClientSession() as session:async with session.get(url, verify_ssl=False) as response:result = await response.text()print('result: ', result)async def main():url_list = ['https://python.org','https://www.baidu.com','https://www.taobao.com']tasks = [fetch(url) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
import aiohttp
import asyncioasync def fetch(session, url):print('发送请求')async with session.get(url, verify_ssl=False) as response:result = await response.text()print('result: ', result)return resultasync def main():async with aiohttp.ClientSession() as session:url_list = ['https://python.org','https://www.baidu.com','https://www.taobao.com']tasks = [fetch(session, url) for url in url_list]done, pending = await asyncio.wait(tasks)print('执行结果: ', done)if __name__ == '__main__':asyncio.run(main())

相关内容

热门资讯

9、Cascaded Diff... 简介 主页:https://cascaded-diffusion.github.io/...
央视首推情感教育纪录片《镜子》... (4月19日)晚,央视纪录片《镜子》首播,给了中国家庭教育当头一棒。  之所以取名“镜子”,是因为“...
朱泾二小:满足孩子与家长的幸福... 教育工作要以孩子和家长“幸福”为追求,满足了学生个性化发展的需求,满足了家长自我提高的需求,家校共同...
争做模范好家长 共育家教新篇章... 为发现和宣传在家庭教育方面有创新有实效的好家长,以及关心教育,支持学校、班级工作的“好家长”先进典型...
比起富养孩子,培养孩子的抗挫商... 1去年十一月,安徽电视台记者段丹峰为情跳楼自杀。自杀前曾连发五小时微博,可见那段时间内她内心遭受多么...
蒙山中学:爆棚的“创城力” 蒙... 家校携手共创城4月27日上午,“我为创城 文明家校”金山区中学家校工作座谈会在蒙山中学举行,来自全区...
Notion汉化 市面上笔记软件五花八门,都各有特色。wolai、语雀、飞书、印象笔记、石墨、幕布、为知...
最新或2023(历届)5月20... 我们用全部的爱关爱家人,我们期待孩子健康快乐的长大,我们竭尽所有把最好的给孩子!每一个小天使的降临,...
最新或2023(历届).5.9... 亲爱的家人朋友们:欢迎您来到美丽的杭州,参加最新或2023(历届)5月9日—5月11日(周二至周四)...
孩子写作业总是拖拖拉拉?家庭教... 成就孩子美好人生!上城区学生成长支持中心第三讲“督促孩子完成作业的窍门 ”,欢迎家长朋友通过微信报名...
反腐正剧《人民的名义》,是一部... 《人民的名义》自最新或2023(历届)3月28日开播以来,已接近一个月,热度不减,而据CSM52城市...
Spring Boot 自定义... 概述 因为最近一直在为公司搭建底层框架, 好久没有更新博客了,本次搭建的框架结构...
想提升宝宝情商,家庭教育很关键... 爸爸妈妈们,你们是否有过对于如何培养宝宝情商的问题的犯难?可能我们可以给宝宝上最好的学校,或者根据宝...
Redis(九):并发问题 前言 上一篇介绍了 Redis 的内存管理。这节开始介绍 Redis 并发方面的问题。 Redis ...
python多线程 文章目录一、简介1.1 多线程的特性1.2 GIL二、线程1.2 单线程1.3 多线程三、线程池3....
綦江区成为家校共育区域性研究实... 4月26日—27日,中国教育学会“十三五”教育科研重点课题“基础教育阶段家校共育的理论与实践研究”现...
曲阜《教子有方》&lt... 一个孩子的教育成功,是全家人的成功!一个孩子的教育失败,是全家人的失败!——————赵国彦任何事业的...
《家庭教育》杂志创刊三十年贺词... 中国家庭教育学会副会长、北京师范大学教授赵忠心(杭州《家庭教育》杂志最新或2023(历届)第一二期)...
美国男人为什么不包二奶? 外国... 1、信仰美国的社会,大部分人们对宗教很虔诚。而对于教徒对婚姻、家庭的忠诚,宗教有很明确的要求,有些宗...
DirectX12(D3D12... 目录1、前言1.1、一些感慨1.2、运行效果展示1.3、示例简介1.4、示例操作说明1.5、本章内容...