python多线程
创始人
2025-05-31 13:32:57
0

文章目录

  • 一、简介
    • 1.1 多线程的特性
    • 1.2 GIL
  • 二、线程
    • 1.2 单线程
    • 1.3 多线程
  • 三、线程池
    • 3.1 pool.submit
    • 3.2 pool.map
  • 四、Lock(线程锁)
    • 4.1 无锁导致的线程资源异常
    • 4.2 有锁
  • 五、Event(事件)
    • 5.1 简介
    • 5.2 示例
  • 六、Queue(队列)
    • 6.1 简介
    • 6.2 生产者 & 消费者
  • 七、Condition(条件锁)
    • 7.1 简介
    • 7.2 notify 单任务通信
    • 7.3 notify_all 多任务通信
  • 八、Semaphore(信号量)
    • 8.1 简介
    • 8.2 示例

一、简介

说起python线程,说少也少,比如线程怎么启动,获取结果,阻塞等;还有线程池的两种运行方式以及使用的一些案例。说多的话,又会涉及到Lock,Rlock,Queue,Condition,Event等很多东西。

这边博客先留着慢慢写,主要介绍线程和线程池的使用,后面吧所有线程内部的东西都深入说一下,为什么要用到这些。

1.1 多线程的特性

Python多线程是Python的一个重要特性,它允许程序同时执行多个线程。Python中的线程是轻量级的,它们共享内存空间,因此创建和销毁线程的开销很小。

多线程切换的效率是以时间为指标的,因为线程切换需要保存当前线程的状态并加载下一个线程的状态,这个过程需要花费一定的时间。在多线程切换的过程中,如果线程的数量过多,那么线程切换的时间就会占用大量的CPU时间,从而导致程序的执行效率降低。因此,在编写多线程程序时,需要合理的控制线程的数量,避免线程切换的时间过长。

1.2 GIL

Python GIL (Global Interpreter Lock)是Python解释器的一个特性,它是一种互斥锁,用于保护Python解释器的内部数据结构。在任何时刻,只有一个线程可以执行Python字节码。这意味着,即使在多核CPU上运行Python程序,也只能使用一个核。

这个特性对于CPU密集型任务来说是一个瓶颈,因为它不能充分利用多核CPU的优势。但是对于I/O密集型任务来说,Python GIL并不是一个问题,因为在I/O操作期间,Python解释器会释放GIL,以便其他线程可以执行Python字节码。如果你想充分利用多核CPU,可以使用多进程或者使用其他语言编写CPU密集型任务的代码。

二、线程

1.2 单线程

import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(len(resp))thread = threading.Thread(target=task, args=('https://www.baidu.com', ))
thread.start()    # 启动这个线程
thread.join()     # 阻塞线程print('end')

1.3 多线程

这里有个点记一下,join是会阻塞任务的,只有当线程全部都跑完,才会向下执行;如果你需要在执行线程的时候响应外部请求,那么只start即可。

import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(f'url length: ', len(resp))urls = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{num}.png' for num in range(1, 6)]
threads = []
for url in urls:thread = threading.Thread(target=task, args=(url, ))thread.start()threads.append(thread)for thread in threads:thread.join()

三、线程池

线程池主要是解决多任务并发的实效性问题,简单的说就是让任务跑的很快,系统资源利用率更高。线程池对于运行多个线程的优点主要就是线程的启动和关闭有资源的开销,而线程池则可以复用线程。你就当作TCP中客户端TIME-WAIT状态的连接重新用于新的TCP连接,一个道理。本质都是为了减少资源开销。

那为啥要用线程池,而不用进程池,区别就是

  1. 线程池中的线程启动开销更低,切换也更快
  2. 线程池主要是为了解决IO的问题(文件IO,网络IO等)
  3. 进程池主要是为了解决CPU密集的问题(数据运算,数据处理等)

python中线程池的库是concurrent.futures,线程池有两种使用方式,submit和map

3.1 pool.submit

下面是我业务场景中要通过线程池启动两个线程,并且要将返回的值拿过来对用户进行响应

一个是获取阿里云SLB的QPS
一个是获取阿里云SLB的RT

这个案例很好说明为什么我要用线程池中的 pool.submit 方法而不是 pool.map,结论是都能实现,submit更方便

  1. 我的任务很少,不去太考虑线程的重用和开销问题
  2. 明确知道需要两个指标,一个是QPS,一个是RT
  3. pool.submit的结果可以通过result()直接拿到
  4. pool.map 返回的是一个generator对象,用起来会更麻烦
# submit获取结果
qps = resp_qps.result()
rt = resp_rt.result()# map获取结果
results = pool.map(func, data)
for result in results:print(result)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()def get_slb_metric(ali_obj, metric_name, delay, dimensions):""" 获取阿里云指标数据 """period = 60namespace = 'acs_slb_dashboard'timestamp = int(time.time())start_time = timestamp_to_str(timestamp=timestamp - delay - period)end_time = timestamp_to_str(timestamp=timestamp - delay)response = ali_obj.describe_metric_data_request(namespace=namespace,metric_name=metric_name,start_time=start_time,end_time=end_time,period=period,dimensions=dimensions,)return json.loads(response['Datapoints'])def get_slb_metric_data(ali_obj, delay, dimensions):""" 获取阿里云指标数据公共方法 """resp_metric = {'success': False,'msg': None,'retry': 0,'data': {'qps': None,'rt': None,}}while resp_metric['retry'] < 5:resp_metric['retry'] += 1resp_qps = pool.submit(get_slb_metric, ali_obj, 'Qps', delay, dimensions)resp_rt = pool.submit(get_slb_metric, ali_obj, 'Rt', delay, dimensions)qps = resp_qps.result()rt = resp_rt.result()if qps and rt:resp_metric['success'] = Trueresp_metric['data']['qps'] = qps[0]['Average']resp_metric['data']['rt'] = rt[0]['Average']return resp_metricdelay += 10resp_metric['msg'] = f'请求异常: \n\n qps或rt数据为空,请延长周期,最后一次delay为: {delay}s, 递增10s请求5次,未成功获取到数据'return resp_metric

3.2 pool.map

那什么时候用pool.map,以我的习惯是当这个任务是固定的,在批量处理数据的情况下,pool.map特别方便。

  1. 需要处理的任务非常多,需要考虑线程开销且复用
  2. 对于任务的执行结果不需要按类型来区分,批量获取

那具体用在什么场景下呢?下面是我一个业务场景,ECI配置中心,里面记录了一条条项目的配置信息,且ECI项目的POD数量是动态的,里面记录了获取POD的URL,需要实时请求。eci_list的方法需要读取配置中心,

这个时候我的任务数是不固定的,有多少个业务需要ECI,那就有多少个URL需要去请求,我总不能一个一个手动去创建线程,并且,所有业务URL的接口给我返回的数据格式是固定的,我也只会取固定的值,那么pool.map就非常方便了。批量请求接口,批量处理结果。

def get_pod_info(data):# 获取项目信息resp_data = {'success': False,'msg': None,'data': data}try:url = data.get('url')headers = {'contene-type': 'application/json'}project = data.get('project')profile = data.get('profile')sign = data.get('sign')req_data = {'sign': sign,'project': project,'profile': profile}resp = requests.post(url=url, headers=headers, data=json.dumps(req_data))if resp.status_code == 200:resp_data['data']['pod'] = 100resp_data['data']['open'] = Trueresp_data['success'] = Trueelse:resp_data['msg'] = f'ECI业务接口异常: {resp.text}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'except Exception as err:resp_data['msg'] = f'ECI运维接口异常: {err}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'finally:return resp_datadef eci_list():if request.method == 'GET':resp_data = {'success': False,'msg': None,'data': None}try:# 获取项目列表queryset = OpsEci.query.all()data = list()for obj in queryset:_obj_host = CloudHost.query.filter_by(resource_type='kubernetes', instance_id=obj.kubernetes).first()_kube_host = re.search('\d+\.\d+\.\d+\.\d+', _obj_host.private_ip[0]).group()_data = {'project': obj.project,'profile': obj.profile,'kubeconfig': f"/home/tomcat/.kube/kubeconfig/{_kube_host}",'token': obj.token,'address': obj.address,'url': obj.url,'sign': obj.sign}data.append(_data)# 处理项目列表with ThreadPoolExecutor() as pool:results = pool.map(get_pod_info, data)results_list = list()for result in results:project = result['data']['project']profile = result['data']['profile']app = project if profile == 'jst' else f'{project}-{profile}'if result.get('success'):results_list.append({'status': True,'open': result['data']['open'],'pod': result['data']['pod'],'ding_token': result['data']['token'],'app': app,'kubeconfig_path': result['data']['kubeconfig']})else:results_list.append({'status': False,'app': app,})resp_data['success'] = Trueresp_data['data'] = results_listexcept Exception as err:resp_data['msg'] = f'获取ECI项目信息异常: {err}'finally:return resp_data

四、Lock(线程锁)

4.1 无锁导致的线程资源异常

当线程没有锁以后,不同的线程使用共享资源会出现不可预估的后果

我们期望的情况

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱失败: 余额:  200

可能会出现的情况

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱成功, 剩余:  -600

无锁代码,这里为了能稳定复现,特别加了sleep

from concurrent.futures import ThreadPoolExecutor
import threading
import timepool = ThreadPoolExecutor()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()

4.2 有锁

如果在使用线程共享资源的时候,给资源加上锁,那么我们每次运行的结果都是一致的

王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余:  200
取钱失败: 余额:  200

有锁代码,在操作线程共享资源的时候,给资源上锁

import threading
import timepool = ThreadPoolExecutor()
lock = threading.Lock()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')with lock:if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()

五、Event(事件)

5.1 简介

Event是python中的一个同步原语,用于线程之间的通信。event有两种状态,分别是set和clear。当event处于set状态时,调用wait方法的线程会立即返回,否则会一直阻塞,直到event被set。

5.2 示例

下面例子中,创建了一个event事件,两个人worker。worker_b调用了event.wait()方法,这会使worker线程阻塞,直到event被set。当worker_a开始运行后,将event置为set后,worker_b结束阻塞,开始运行。

import threading
import timeevent = threading.Event()def worker_a():print(f'{time.time()}: worker_a 等待运行')print(f'{time.time()}: worker_a 开始运行')event.set()def worker_b():print(f'{time.time()}: worker_b 等待运行')event.wait()print(f'{time.time()}: worker_b 开始运行')if __name__ == '__main__':t_a = threading.Thread(target=worker_a)t_b = threading.Thread(target=worker_b)t_b.start()time.sleep(1)t_a.start()

执行结果

1679301511.736171: worker_b 等待运行
1679301512.741683: worker_a 等待运行
1679301512.741767: worker_a 开始运行
1679301512.741956: worker_b 开始运行

六、Queue(队列)

6.1 简介

Python中的Queue模块提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。

6.2 生产者 & 消费者

下面是一个使用Queue模块实现多线程的示例,其中包括了生产者和消费者两个线程,生产者向队列中添加元素,消费者从队列中取出元素,而他们都是用队列queue.Queue();除此之外,还有queue.LifoQueue,queue.PriorityQueue。

import threading
import queue
import timeclass Producer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):for i in range(100):self.queue.put(i)time.sleep(0.3)print('producer put end')class Consumer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):while True:if self.queue.empty():time.sleep(1)print('queue is empty, waiting ...')else:print(f'consumer get {self.queue.get()}')time.sleep(0.1)if __name__ == '__main__':q = queue.Queue()producer = Producer(q)consumer = Consumer(q)producer.start()consumer.start()

七、Condition(条件锁)

7.1 简介

在 Python 中,可以使用 threading.Condition 实现条件锁。Condition 对象提供了 acquire() 和 release() 方法,与 Lock 对象的方法类似。此外,Condition 对象还提供了 wait()、notify() 和 notify_all() 方法,用于线程间的协调。具体使用方法可以参考 Python 官方文档中的 threading.Condition 部分。

在 threading.Condition 中,wait() 方法会释放锁并挂起当前线程,直到另一个线程调用 notify() 或 notify_all() 方法唤醒它。notify() 方法会随机唤醒一个挂起的线程,而 notify_all() 方法会唤醒所有挂起的线程。需要注意的是,wait() 方法只能在已经获得锁的情况下调用,否则会抛出 RuntimeError 异常。

在使用 threading.Condition 时,通常需要先获得一个 Lock 对象,然后使用这个 Lock 对象创建一个 Condition 对象。在需要等待某个条件时,调用 Condition 对象的 wait() 方法;在满足条件时,调用 notify() 或 notify_all() 方法唤醒等待的线程

7.2 notify 单任务通信

import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '-----任务开始-----')print(self.name, '事件A处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件B处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件C处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件D处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '-----任务结束-----')class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:self.cond.wait()print(self.name, '事件A已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件B已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件C已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件D已响应, 请继续')self.cond.notify()if __name__ == '__main__':master = Master('master', condition)worker = Worker('worker', condition)worker.start()time.sleep(1)master.start()

执行结果

master -----任务开始-----
master 事件A处理完毕, 等待worker响应...
worker 事件A已响应, 请继续
master 事件B处理完毕, 等待worker响应...
worker 事件B已响应, 请继续
master 事件C处理完毕, 等待worker响应...
worker 事件C已响应, 请继续
master 事件D处理完毕, 等待worker响应...
worker 事件D已响应, 请继续
master -----任务结束-----

7.3 notify_all 多任务通信

import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '前置准备工作结束, 通知子任务开始任务...')time.sleep(1)self.cond.notify_all()class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '准备就绪, 等待调度...')self.cond.wait()print(self.name, '接收到主任务通知, 开始执行任务')print(self.name, '任务A执行完成')print(self.name, '任务B执行完成')print(self.name, '任务C执行完成')if __name__ == '__main__':master = Master('master', condition)worker_a = Worker('worker-a', condition)worker_b = Worker('worker-b', condition)worker_c = Worker('worker-c', condition)worker_a.start()worker_b.start()worker_c.start()time.sleep(0.3)master.start()
worker-a 准备就绪, 等待调度...
worker-b 准备就绪, 等待调度...
worker-c 准备就绪, 等待调度...
master 前置准备工作结束, 通知子任务开始任务...
worker-a 接收到主任务通知, 开始执行任务
worker-a 任务A执行完成
worker-a 任务B执行完成
worker-a 任务C执行完成
worker-b 接收到主任务通知, 开始执行任务
worker-b 任务A执行完成
worker-b 任务B执行完成
worker-b 任务C执行完成
worker-c 接收到主任务通知, 开始执行任务
worker-c 任务A执行完成
worker-c 任务B执行完成
worker-c 任务C执行完成

八、Semaphore(信号量)

8.1 简介

Semaphore用于控制对共享资源的访问。semaphore维护一个内部计数器。该计数器可以通过 acquire() 和 release() 两个方法来增加和减少。当计数器为0时,acquire() 方法将会被阻塞,直到其他线程调用 release() 方法位置。

semaphore.acquire() 将会使计数器-1,当计数器为0则会阻塞当前线程
semaphore.release() 将会时计数器+1,以便有更多的资源去使用计数器

8.2 示例

下面模拟10个任务运行的情况,同时运行三个线程,通过Semaphore进行控制线程数。可以发现,通过semaphore即可控制线程的worker

import threading
import timesemaphore = threading.Semaphore(3)def task():"""任务机"""with semaphore:print(time.strftime('%H:%M:%S'), threading.current_thread().name, '开始执行...')time.sleep(2)if __name__ == '__main__':for i in range(10):t = threading.Thread(target=task)t.start()

执行结果

18:12:05 Thread-1 开始执行...
18:12:05 Thread-2 开始执行...
18:12:05 Thread-3 开始执行...
18:12:07 Thread-4 开始执行...
18:12:07 Thread-5 开始执行...
18:12:07 Thread-6 开始执行...
18:12:09 Thread-7 开始执行...
18:12:09 Thread-9 开始执行...
18:12:09 Thread-8 开始执行...
18:12:11 Thread-10 开始执行...

相关内容

热门资讯

人生励志签名100句,qq人生... 1、知识愈浅,自信愈深。——苏联   2、勇敢乃是自信与害怕中间之道。——(希腊)亚里士多德  3、...
安全生产标语二十条,安全生产标... 1、关爱生命 安全发展 2、安全第一 预防为主 综合治理3、广泛深入开展“安全生产月...
关于青春的励志签名,创业励志签... 1、世上最重要的事,不在于我们在何处,而在于我们朝着什么方向走。   2、世上没有绝望的处境,只有对...
拼搏的励志签名,最新或2023... 1、如果你认为学校里的老师过于严厉,那么等你有了老板再回头想一想。   2、世界青睐有雄心壮志的人。...
春运安全标语,春运安全注意事项... 1、文明交通你我他,平安出行迎新春。 2、春运安全连万家,出行平安你我他。3、驾车莫贪杯中酒,平安幸...
访问网络服务-Go对IPC的支... 访问网络服务-Go对IPC的支持 文章目录访问网络服务-Go对IPC的支持一、socket与IPC1...
自己主持婚礼主持词精选 婚礼开...  自己主持婚礼主持词精选(一)  1、新人入场  新人踏着舞步走到一半,庄重登常  新郎:繁花似锦,...
浪婚礼主持词串词顺口溜 濠氬簡... 今天####年#月#日,天空因为今天的喜庆而变得格外晴朗,而大地也因为年青人的无限活力而充满了生机。...
最新或2023(历届)新年跨年... 最新或2023(历届)新年说说祝福大全一:过年要玩好,必须的;早餐别忘了,注意的;烟酒要减少,提醒的...
搞笑婚礼主持词精选 搞笑婚礼主... 开场音乐中 司仪登场 (秋天)  嘘,,,,多安静啊,一点掌声都没有,感谢大家无声的支持。  凉风有...
最新或2023(历届)鼠年qq...  最新或2023(历届)qq空间新年说说大全  一、 新年到来,百无禁忌。东行吉祥,南走顺利,西出平...
大模型落地比趋势更重要,NLP... 全球很多人都开始相信,以ChatGPT为代表的大模型,将带来一场NLP领...
Android开发-Andro... 01  Android UI 1.1  UI 用户界面(User Interface,...
有关于致父母的一封感谢信 给父... 父母的教育素质影响着学前儿童家庭教育的效果和质量,也直接决定着家庭教育的优劣成败。那么有关于致父母的...
有关于写给老师的一封感谢信 给... 感谢信作为一种礼貌性言语行为的载体,其中谢意表达的处理成为写好感谢信的核心环节。下面小编整理了有关于...
有关于致干部家属的感谢信 致干... 感谢信是写信人对给予自己帮助、支持、关心、祝贺、慰问、馈赠的施惠人表示答谢而写的社交礼仪书信。下面小...
婚礼主持词开场白,浪漫婚礼主持... 1、开场白:(神秘花园)  我们每个人都生长在同一个蔚蓝的星球;我们每个人都是从平坦或不平坦的一端开...
有关于致公交司机的感谢信 给公... 公交司机作为影响城市公共交通供给质量和服务满意度的双重介入主体,其个体行为在很大程度上代表了城市综合...
A.机器学习入门算法(三):基... 机器学习算法(三):K近邻(k-nearest neigh...
有关于写给公安机关的感谢信 写... 公安机关内部管理制度是否完善,对于确保公安队伍建设和管理,保证公安机关顺利地履行职责和任务,具有重大...