其实这块东西有点多,从iterator,iterable,yield,yield from,再到最终的async,await,asyncio
经历了python2.x -> 3.x的版本迭代,一篇文章也很难讲清楚,本文章主要说明在实际开发中怎么使用协程
支持协程所有指令的python版本:python >= 3.7
支持协程的python web框架:Django 3.x,Flask 2.x,fastAPI
支持协程的python 库:想实现什么功能建议直接去Google搜索
线程遇到IO阻塞是由系统调度的;协程是由用户调度的,更快更省资源
线程启动的时候需要定义线程数;协程则不需要,它能运行更多的任务
协程是在线程内部运行,所以也受到GIL全局解释器锁的限制,无法同时利用多核CPU
协程需要有对应的第三方异步库的支持,否则得自己去使用协程的方法实现
协程是为了运行多任务的存在,如果使用协程只是执行单个任务,那它其实并不能提升效率
协程相比线程池,实现更复杂一点
通过一个线程利用其IO等待时间去做更多的事情
这里为什么不用requests,因为python的requests库不支持协程,就像上面讲的要使用协程,也得需要web框架支持。
import asyncio
import aiohttpasync def fetch(session, url):print("发送请求")async with session.get(url, verify_ssl=False) as response:content = await response.content.read()file_name = url.split('_')[-1]with open(file_name, 'wb') as f:f.write(content)async def main():async with aiohttp.ClientSession() as session:url_list = ['https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_65.png','https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_66.png','https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_67.png']tasks = [asyncio.create_task(fetch(session, url)) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
简单点说,事件循环可以理解为统一调度协程中一批任务的队列,将这协程任务丢到这个队列,进行调度;至于什么时候调度,取决于await语句;而async语句声明这是一个协程任务
import asynciotasks = list()
# 去生成或获取一个事件循环
loop = asyncio.get_event_loop()
# 将任务放到列表
loop.run_until_complete(tasks)
协程函数: async def foo()
协程对象: 执行协程函数得到的对象,如: foo()
特别注意: 当协程函数func()后,内部代码是不会执行的,只是得到了一个协程对象(这是和普通函数的区别)
如果想要运行协程对象,则必须将其交给事件循环来处理
python3.7 + 的写法这里为什么没有定义事件循环? 因为asyncio.run(result) 在执行的时候,会默认创建一个事件循环;简化了之前版本的写法,所以协程使用是很简单的
async def func(): # func为协程函数print('开始执行')result = func() # result为协程对象# python 3.7之前的写法
# loop = asyncio.get_event_loop()
# loop.run_until_complete(result) # 让事件循环去执行协程对象# python 3.7+写法
asyncio.run(result)
await 后面定义可等待的东西 (可以是:协程对象,Future,Task对象);这里的可等待东西就是IO等待
await 就是等待可等待的对象的值返回以后再放下走,在await的同时,从当前的协程任务切换到另一个就绪的协程任务
result = await 则表示result是await后面的指令返回结果,也就是一个return的对象
import asyncio
async def func():print('过来呀')response = await asyncio.sleep(2)print('已结束, ', response)asyncio.run(func())
import asyncioasync def others():print('start')await asyncio.sleep(2)print('end')return '返回'async def func():print('开始执行协程函数内部代码')# 当遇到IO操作挂起当前协程(任务),等IO操作完成之后再继续往下执行。当前协程挂起时,事件循环可以去执行其他协程(任务)response = await others()print('结束执行协程函数内部代码 ', response)asyncio.run(func())
前面的示例都是将一个协程函数添加到事件循环中,这其实没什么意义,因为使用协程本质上来讲是运行多个批量的任务。Task对象就是将一组协程对象;Tasks用于并发调度协程,通过asyncio.create_task(协程对象) 的方式创建Task对象,这样可以让协程加入到事件循环中等待被调度执行。
除了使用async.create()函数以外,还可以用低层级的 loop.create_task() 或ensure_future() 函数。不建议手动实例化Task对象
注意:asyncio.create_task() 函数在python3.7中被加入。在python3.7之前,可以改用低层级的 asyncio.ensure_future()函数,python官方建议用户使用高层级的方法
下面是使用task对象的简单方法,将一个一个任务添加到事件循环的。
import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'async def main():print('main开始')# 创建协程对象,将协程对象封装到一个Task对象中并立即添加到事件循环的任务列表中,等待事件循环去执行(默认是就绪状态)task1 = asyncio.create_task(func())task2 = asyncio.create_task(func())print('main结束')# 当执行某协程遇到IO操作时,会自动化切换执行其他任务# 此处的await是等待相对应的协程全部都执行完毕并获取结果ret1 = await task1ret2 = await task2print(ret1, ret2)asyncio.run(main())
下面是列出了一组tasks任务,一次性添加到携程事件循环中
import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'async def main():print('main开始')task_list = [asyncio.create_task(func(), name='t1'),asyncio.create_task(func(), name='t2')]print('main结束')done, pending = await asyncio.wait(task_list, timeout=3)print(done, pending)asyncio.run(main())
下面是上面方法的简写,使代码更简洁
import asyncio
async def func():print(1)await asyncio.sleep(2)print(2)return '返回值'task_list = [func() for _ in range(5)]
done, pending = asyncio.run(asyncio.wait(task_list))
print('done: ', done)
print('pending: ', pending)
Future是一个更低层的接口,用来等待异步的执行结果
Task继承Future对象,Task对象内部await结果的处理基于Future对象来的。
import asyncioasync def main():# 获取当前运行的事件循环loop = asyncio.get_running_loop()# 创建一个任务(Future对象),这个任务什么都不干future = loop.create_future()# 等待任务最终结果(Future),没有结果则会一直等待下去await futureasyncio.run(main())
import asyncio
import timeasync def set_after(future):await asyncio.sleep(2)future.set_result('666')async def main():start_time = time.time()# 获取当前事件循环loop = asyncio.get_running_loop()# 创建一个任务(Future对象),没绑定任何行为,则这个任务永远不知道什么时候结束future = loop.create_future()# 创建一个任务(Task对象),绑定了set_after协程对象,内部函数在2s之后,会给future赋值# 即手动设置future任务的最终结果,那么future就可以结束了await loop.create_task(set_after(future))# 等待Future对象获取最终结果,否则一直等下去data = await futureprint(data)print(time.time() - start_time)asyncio.run(main())
使用线程池、进程池实现异步操作时用到的对象。
线程池:from concurrent.futures import ThreadPoolExecutor
进程池:from concurrent.futures import ProcessPoolExecutor
import time
from concurrent.futures.thread import ThreadPoolExecutor
from concurrent.futures.process import ProcessPoolExecutordef func(value):time.sleep(1)print(value)return valueif __name__ == '__main__':# 创建线程池with ThreadPoolExecutor() as pool:for i in range(12):fut = pool.submit(func, i)print(fut)# 创建进程池with ProcessPoolExecutor() as pool:for i in range(12):fut = pool.submit(func, i)print(fut)
以后写代码可能会存在交叉使用
例如:crm项目80%都是基于协程异步编程+MySQL(不支持)【线程、进程做异步编程】
当有的第三方库不支持协程,那么就需要两种结合
import time
import asyncio
import concurrent.futuresdef func1():# 某个耗时操作time.sleep(2)return 'ok'async def main():loop = asyncio.get_running_loop()# 运行默认的loop执行器(默认为ThreadPoolExecutor)# 1. 内部会先调用 ThreadPoolExecutor 的submit方法去线程池中申请一个线程去执行func1函数,并返回一个concurrent.futures.Future对象# 2. 调用asyncio.wrap_future将concurrent.futures.Future对象包装为asyncio.Future对象# 因为concurrent.futures.Future对象不支持await方法,所以需要包装为 asyncio.Future 对象才能使用fut = loop.run_in_executor(None, func1)result = await futprint('default thread pool', result)# 运行一个线程池with concurrent.futures.ThreadPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom thread pool: ', result)# 运行一个进程池with concurrent.futures.ProcessPoolExecutor() as pool:result = await loop.run_in_executor(pool, func1)print('custom process pool: ', result)if __name__ == '__main__':asyncio.run(main())
由于requests是不支持协程的,所以为了使用协程提升效率,则可以使用loop.run_in_executor()方法
下面的场景其实在多线程和协程分别执行的时候,效率其实提升不了多少,因为文件很小本身就很快,没有遇到多少阻塞;那在什么情况下会有很大的区别呢,就是文件很大需要时间或者你会在进行sleep等待浏览器程序加载前端页面的时候
下面使用loop.run_in_executor()执行
import asyncio
import requestsasync def download_image(url):# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)print('开始下载: ', url)loop = asyncio.get_event_loop()future = loop.run_in_executor(None, requests.get, url)response = await futureprint('下载完成')# 图片保存到本地file_name = f"/Users/yuehua/Downloads/images/{url.split('_')[-1]}"with open(file_name, 'wb') as f_obj:f_obj.write(response.content)if __name__ == '__main__':url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]tasks = [download_image(url) for url in url_list]loop = asyncio.get_event_loop()loop.run_until_complete(asyncio.wait(tasks))
import asyncio
import requests
import timeasync def download_image(url):start_time = time.time()# 发送网络请求,下载图片(遇到网络下载图片的IO请求,自动化切换到其他任务)loop = asyncio.get_event_loop()response = await loop.run_in_executor(None, requests.get, url)# 模拟等待浏览器前端页面加载await asyncio.sleep(5)# 图片保存到本地file_name = f"/Users/yuehua/Downloads/images/{url.split('/')[-1]}"with open(file_name, 'wb') as f_obj:f_obj.write(response.content)print(time.time() - start_time)# 用该方式获取result,不过python3.11中将删除该方式
# async def main():
# url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]
# tasks = [download_image(url) for url in url_list]
# done, pending = await asyncio.wait(tasks)
# print(done)if __name__ == '__main__':url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]tasks = [download_image(url) for url in url_list]asyncio.run(asyncio.wait(tasks))# asyncio.run(main())
from concurrent.futures import ThreadPoolExecutor
import requests
import timedef url_save(url):start_time = time.time()response = requests.get(url)file_name = f"/Users/yuehua/Downloads/images/{url.split('/')[-1]}"# 模拟等待浏览器前端页面加载time.sleep(5)with open(file_name, 'wb') as f_obj:f_obj.write(response.content)return (time.time() - start_time)if __name__ == '__main__':start_time = time.time()with ThreadPoolExecutor() as pool:url_list = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{i}.png' for i in range(60)]results = pool.map(url_save, tuple(url_list))for i in results:print('运行时间: ', i)print('总运行时间: ', time.time() - start_time)
如果你手动执行过上面线程和协程的代码,你会发现,当任务执行过程中需要等待IO的时候,协程与线程的效率有多不同
只要任务队列不多,基本上忽略线程和协程上下文切换的时间(协程切换<线程切换),例如我们爬虫的总任务数=sum(需爬虫的账户*每个账户的资源标签)
顺便提一下python中的并行与并发,有时间单独拿出来介绍
多进程:并行运行。当CPU核心数>1,任务是会并行运行的,且可并行运行的任务数量等于核心数
多线程:并发运行。因为全局解释器锁GIL,只有当前正在运行的线程才持有锁,其他线程都被挂起
协程:并发运行。协程是在线程内部运行的,且上下文的切换是由用户控制,且同一时刻只能有一个任务在运行
什么是异步迭代器
实现了 aiter()和 anext()方法的对象。anext 必须返回一个 awaitable对象。async_for会处理异步迭代器的
anext()方法所返回的可迭代对象,直到其引发一个 StopAsyncIteration 异常。
什么是异步可迭代对象
可在 async_for 语句中被使用的对象。必须通过它的 aiter() 方法返回一个 asynchronousiterator
import asyncioclass Reader:""" 自定义异步迭代器(同时也是异步可迭代对象)"""def __init__(self):self.count = 0async def readline(self):# await asyncio.sleep(1)self.count += 1if self.cont == 100:return Nonereturn self.countdef __aiter__(self):return selfasync def __anext__(self):val = await self.readline()if val == None:raise StopAsyncIterationreturn val# 异步迭代器必须由协程函数中执行
async def func():obj = Reader()async for i in obj:print(i)# asyncio.run执行协程函数
asyncio.run(func())
此种对象通过定义 aenter() 和 aexit()方法来对 async_with 语句中的环境进行控制
import asyncio
import timeclass AsyncContextManager:def __init__(self):self.conn = Noneasync def do_somethins(self):# 异步操作数据库return '嘿嘿操作好了!'async def __aenter__(self):# 异步连接数据库self.conn = await asyncio.sleep(1)return selfasync def __aexit__(self, exc_type, exc, tb):# 异步关闭数据库连接await asyncio.sleep(1)async def func():async with AsyncContextManager() as f:result = await f.do_somethins()print(result)if __name__ == '__main__':asyncio.run(func())
uvloop是asyncio事件循环的替代方案。它事件循环效率>默认asyncio的事件循环,性能更高
安装:pip3 install uvloop
fastapi是很火的一个支持协程编程的python web框架,它使用率asgi unicorn,而unicorn内部使用的就是uvloop
import asyncio
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())# 编写 asyncio 的代码,与之前的代码一致# 内部的事件循环自动转化为uvlooop
asyncio.run()
在使用python代码操作redis时,连接/操作/断开都是网络IO
安装:pip3 install aioredis
import asyncio
import aioredisasync def execute(url):print('建立连接: ', url)# 网络IO操作:创建redis连接redis = await aioredis.from_url(url=url,max_connections=10,decode_responses=True,db=4)await redis.set('my-key', 'my-value')await redis.mset({'key:1': 'value1', 'key:2': 'value2'})val1 = await redis.get('my-key')val2 = await redis.get('key:1')await redis.close()print(val1, val2)url = 'redis://127.0.0.1:6379'
asyncio.run(execute(url))
import asyncio
import aioredisasync def execute(url):print('建立连接: ', url)# 网络IO操作:创建redis连接redis = await aioredis.from_url(url=url,max_connections=10,decode_responses=True,db=4)await redis.set('my-key', 'my-value')await redis.mset({'key:1': 'value1', 'key:2': 'value2'})val1 = await redis.get('my-key')val2 = await redis.get('key:1')await redis.close() print(val1, val2)task_list = [execute('redis://127.0.0.1:6379'),execute('redis://127.0.0.1:6379'),
]
asyncio.run(asyncio.wait(task_list))
安装:pip3 install aiomysql
import asyncio
import aiomysqlasync def test_example():conn = await aiomysql.connect(host='127.0.0.1', port=3306,user='root', password='heihei',db='devops_dev')cursor = await conn.cursor()await cursor.execute("SELECT instance_id, name FROM ops_host limit 10")r = await cursor.fetchall()await cursor.close()conn.close()print(r)asyncio.run(test_example())
import asyncio
import aiomysqlasync def test_example():conn = await aiomysql.connect(host='127.0.0.1', port=3306,user='root', password='heihei',db='devops_dev')cursor = await conn.cursor()await cursor.execute("SELECT instance_id, name FROM ops_host limit 10")r = await cursor.fetchall()await cursor.close()conn.close()print(r)task_list = [test_example(),test_example()
]asyncio.run(asyncio.wait(task_list))
pip3 install fastapi
pip3 install uvicorn (asgi 支持异步的wsgi,内部基于uvloop)
uvicorn bubu:app --reload
from fastapi import FastAPI
import asyncio
import aioredis
import uvicorn
import timeredis_pool = aioredis.ConnectionPool.from_url(url='redis://127.0.0.1:6379',max_connections=10,decode_responses=True,db=4
)
redis = aioredis.Redis(connection_pool=redis_pool)app = FastAPI()@app.get("/")
async def root():await asyncio.sleep(3)return {"message": "Hello World"}@app.get('/red')
async def red():print('嘿嘿我请求来了')await asyncio.sleep(3)await redis.execute_command('set', 'aa', 'bb')val = await redis.execute_command('get', 'aa')return valif __name__ == '__main__':uvicorn.run('bubu:app', host='127.0.0.1', port=8888, log_level='info')
import aiohttp
import asyncioasync def fetch(url):print('发送请求')async with aiohttp.ClientSession() as session:async with session.get(url, verify_ssl=False) as response:result = await response.text()print('result: ', result)async def main():url_list = ['https://python.org','https://www.baidu.com','https://www.taobao.com']tasks = [fetch(url) for url in url_list]await asyncio.wait(tasks)if __name__ == '__main__':asyncio.run(main())
import aiohttp
import asyncioasync def fetch(session, url):print('发送请求')async with session.get(url, verify_ssl=False) as response:result = await response.text()print('result: ', result)return resultasync def main():async with aiohttp.ClientSession() as session:url_list = ['https://python.org','https://www.baidu.com','https://www.taobao.com']tasks = [fetch(session, url) for url in url_list]done, pending = await asyncio.wait(tasks)print('执行结果: ', done)if __name__ == '__main__':asyncio.run(main())