说起python线程,说少也少,比如线程怎么启动,获取结果,阻塞等;还有线程池的两种运行方式以及使用的一些案例。说多的话,又会涉及到Lock,Rlock,Queue,Condition,Event等很多东西。
这边博客先留着慢慢写,主要介绍线程和线程池的使用,后面吧所有线程内部的东西都深入说一下,为什么要用到这些。
Python多线程是Python的一个重要特性,它允许程序同时执行多个线程。Python中的线程是轻量级的,它们共享内存空间,因此创建和销毁线程的开销很小。
多线程切换的效率是以时间为指标的,因为线程切换需要保存当前线程的状态并加载下一个线程的状态,这个过程需要花费一定的时间。在多线程切换的过程中,如果线程的数量过多,那么线程切换的时间就会占用大量的CPU时间,从而导致程序的执行效率降低。因此,在编写多线程程序时,需要合理的控制线程的数量,避免线程切换的时间过长。
Python GIL (Global Interpreter Lock)是Python解释器的一个特性,它是一种互斥锁,用于保护Python解释器的内部数据结构。在任何时刻,只有一个线程可以执行Python字节码。这意味着,即使在多核CPU上运行Python程序,也只能使用一个核。
这个特性对于CPU密集型任务来说是一个瓶颈,因为它不能充分利用多核CPU的优势。但是对于I/O密集型任务来说,Python GIL并不是一个问题,因为在I/O操作期间,Python解释器会释放GIL,以便其他线程可以执行Python字节码。如果你想充分利用多核CPU,可以使用多进程或者使用其他语言编写CPU密集型任务的代码。
import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(len(resp))thread = threading.Thread(target=task, args=('https://www.baidu.com', ))
thread.start() # 启动这个线程
thread.join() # 阻塞线程print('end')
这里有个点记一下,join是会阻塞任务的,只有当线程全部都跑完,才会向下执行;如果你需要在执行线程的时候响应外部请求,那么只start即可。
import threading
import requestsdef task(url):resp = requests.get(url=url).textprint(f'url length: ', len(resp))urls = [f'https://www.raycloud.com/r/cms/www/default/images/clientele_logo/logo_{num}.png' for num in range(1, 6)]
threads = []
for url in urls:thread = threading.Thread(target=task, args=(url, ))thread.start()threads.append(thread)for thread in threads:thread.join()
线程池主要是解决多任务并发的实效性问题,简单的说就是让任务跑的很快,系统资源利用率更高。线程池对于运行多个线程的优点主要就是线程的启动和关闭有资源的开销,而线程池则可以复用线程。你就当作TCP中客户端TIME-WAIT状态的连接重新用于新的TCP连接,一个道理。本质都是为了减少资源开销。
那为啥要用线程池,而不用进程池,区别就是
python中线程池的库是concurrent.futures,线程池有两种使用方式,submit和map
下面是我业务场景中要通过线程池启动两个线程,并且要将返回的值拿过来对用户进行响应
一个是获取阿里云SLB的QPS
一个是获取阿里云SLB的RT
这个案例很好说明为什么我要用线程池中的 pool.submit 方法而不是 pool.map,结论是都能实现,submit更方便
# submit获取结果
qps = resp_qps.result()
rt = resp_rt.result()# map获取结果
results = pool.map(func, data)
for result in results:print(result)
from concurrent.futures import ThreadPoolExecutor
pool = ThreadPoolExecutor()def get_slb_metric(ali_obj, metric_name, delay, dimensions):""" 获取阿里云指标数据 """period = 60namespace = 'acs_slb_dashboard'timestamp = int(time.time())start_time = timestamp_to_str(timestamp=timestamp - delay - period)end_time = timestamp_to_str(timestamp=timestamp - delay)response = ali_obj.describe_metric_data_request(namespace=namespace,metric_name=metric_name,start_time=start_time,end_time=end_time,period=period,dimensions=dimensions,)return json.loads(response['Datapoints'])def get_slb_metric_data(ali_obj, delay, dimensions):""" 获取阿里云指标数据公共方法 """resp_metric = {'success': False,'msg': None,'retry': 0,'data': {'qps': None,'rt': None,}}while resp_metric['retry'] < 5:resp_metric['retry'] += 1resp_qps = pool.submit(get_slb_metric, ali_obj, 'Qps', delay, dimensions)resp_rt = pool.submit(get_slb_metric, ali_obj, 'Rt', delay, dimensions)qps = resp_qps.result()rt = resp_rt.result()if qps and rt:resp_metric['success'] = Trueresp_metric['data']['qps'] = qps[0]['Average']resp_metric['data']['rt'] = rt[0]['Average']return resp_metricdelay += 10resp_metric['msg'] = f'请求异常: \n\n qps或rt数据为空,请延长周期,最后一次delay为: {delay}s, 递增10s请求5次,未成功获取到数据'return resp_metric
那什么时候用pool.map,以我的习惯是当这个任务是固定的,在批量处理数据的情况下,pool.map特别方便。
那具体用在什么场景下呢?下面是我一个业务场景,ECI配置中心,里面记录了一条条项目的配置信息,且ECI项目的POD数量是动态的,里面记录了获取POD的URL,需要实时请求。eci_list的方法需要读取配置中心,
这个时候我的任务数是不固定的,有多少个业务需要ECI,那就有多少个URL需要去请求,我总不能一个一个手动去创建线程,并且,所有业务URL的接口给我返回的数据格式是固定的,我也只会取固定的值,那么pool.map就非常方便了。批量请求接口,批量处理结果。
def get_pod_info(data):# 获取项目信息resp_data = {'success': False,'msg': None,'data': data}try:url = data.get('url')headers = {'contene-type': 'application/json'}project = data.get('project')profile = data.get('profile')sign = data.get('sign')req_data = {'sign': sign,'project': project,'profile': profile}resp = requests.post(url=url, headers=headers, data=json.dumps(req_data))if resp.status_code == 200:resp_data['data']['pod'] = 100resp_data['data']['open'] = Trueresp_data['success'] = Trueelse:resp_data['msg'] = f'ECI业务接口异常: {resp.text}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'except Exception as err:resp_data['msg'] = f'ECI运维接口异常: {err}\n\n地址: {url}\n\n项目: {project}\n\n标签: {profile}'finally:return resp_datadef eci_list():if request.method == 'GET':resp_data = {'success': False,'msg': None,'data': None}try:# 获取项目列表queryset = OpsEci.query.all()data = list()for obj in queryset:_obj_host = CloudHost.query.filter_by(resource_type='kubernetes', instance_id=obj.kubernetes).first()_kube_host = re.search('\d+\.\d+\.\d+\.\d+', _obj_host.private_ip[0]).group()_data = {'project': obj.project,'profile': obj.profile,'kubeconfig': f"/home/tomcat/.kube/kubeconfig/{_kube_host}",'token': obj.token,'address': obj.address,'url': obj.url,'sign': obj.sign}data.append(_data)# 处理项目列表with ThreadPoolExecutor() as pool:results = pool.map(get_pod_info, data)results_list = list()for result in results:project = result['data']['project']profile = result['data']['profile']app = project if profile == 'jst' else f'{project}-{profile}'if result.get('success'):results_list.append({'status': True,'open': result['data']['open'],'pod': result['data']['pod'],'ding_token': result['data']['token'],'app': app,'kubeconfig_path': result['data']['kubeconfig']})else:results_list.append({'status': False,'app': app,})resp_data['success'] = Trueresp_data['data'] = results_listexcept Exception as err:resp_data['msg'] = f'获取ECI项目信息异常: {err}'finally:return resp_data
当线程没有锁以后,不同的线程使用共享资源会出现不可预估的后果
我们期望的情况
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱失败: 余额: 200
可能会出现的情况
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱成功, 剩余: -600
无锁代码,这里为了能稳定复现,特别加了sleep
from concurrent.futures import ThreadPoolExecutor
import threading
import timepool = ThreadPoolExecutor()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()
如果在使用线程共享资源的时候,给资源加上锁,那么我们每次运行的结果都是一致的
王二狗第1次取钱
王二狗第2次取钱
取钱成功, 剩余: 200
取钱失败: 余额: 200
有锁代码,在操作线程共享资源的时候,给资源上锁
import threading
import timepool = ThreadPoolExecutor()
lock = threading.Lock()def bank(amount):global balanceprint(f'王二狗第{threading.current_thread().name}次取钱')with lock:if balance > amount:time.sleep(0.1)balance = balance - amountprint('取钱成功, 剩余: ', balance)else:print('取钱失败: 余额: ', balance)if __name__ == '__main__':balance = 1000t1 = threading.Thread(target=bank, args=(800,), name='1')t2 = threading.Thread(target=bank, args=(800,), name='2')t1.start()t2.start()
Event是python中的一个同步原语,用于线程之间的通信。event有两种状态,分别是set和clear。当event处于set状态时,调用wait方法的线程会立即返回,否则会一直阻塞,直到event被set。
下面例子中,创建了一个event事件,两个人worker。worker_b调用了event.wait()方法,这会使worker线程阻塞,直到event被set。当worker_a开始运行后,将event置为set后,worker_b结束阻塞,开始运行。
import threading
import timeevent = threading.Event()def worker_a():print(f'{time.time()}: worker_a 等待运行')print(f'{time.time()}: worker_a 开始运行')event.set()def worker_b():print(f'{time.time()}: worker_b 等待运行')event.wait()print(f'{time.time()}: worker_b 开始运行')if __name__ == '__main__':t_a = threading.Thread(target=worker_a)t_b = threading.Thread(target=worker_b)t_b.start()time.sleep(1)t_a.start()
执行结果
1679301511.736171: worker_b 等待运行
1679301512.741683: worker_a 等待运行
1679301512.741767: worker_a 开始运行
1679301512.741956: worker_b 开始运行
Python中的Queue模块提供了同步的、线程安全的队列类,包括FIFO(先进先出)队列Queue,LIFO(后进先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。
下面是一个使用Queue模块实现多线程的示例,其中包括了生产者和消费者两个线程,生产者向队列中添加元素,消费者从队列中取出元素,而他们都是用队列queue.Queue();除此之外,还有queue.LifoQueue,queue.PriorityQueue。
import threading
import queue
import timeclass Producer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):for i in range(100):self.queue.put(i)time.sleep(0.3)print('producer put end')class Consumer(threading.Thread):def __init__(self, queue):threading.Thread.__init__(self)self.queue = queuedef run(self):while True:if self.queue.empty():time.sleep(1)print('queue is empty, waiting ...')else:print(f'consumer get {self.queue.get()}')time.sleep(0.1)if __name__ == '__main__':q = queue.Queue()producer = Producer(q)consumer = Consumer(q)producer.start()consumer.start()
在 Python 中,可以使用 threading.Condition 实现条件锁。Condition 对象提供了 acquire() 和 release() 方法,与 Lock 对象的方法类似。此外,Condition 对象还提供了 wait()、notify() 和 notify_all() 方法,用于线程间的协调。具体使用方法可以参考 Python 官方文档中的 threading.Condition 部分。
在 threading.Condition 中,wait() 方法会释放锁并挂起当前线程,直到另一个线程调用 notify() 或 notify_all() 方法唤醒它。notify() 方法会随机唤醒一个挂起的线程,而 notify_all() 方法会唤醒所有挂起的线程。需要注意的是,wait() 方法只能在已经获得锁的情况下调用,否则会抛出 RuntimeError 异常。
在使用 threading.Condition 时,通常需要先获得一个 Lock 对象,然后使用这个 Lock 对象创建一个 Condition 对象。在需要等待某个条件时,调用 Condition 对象的 wait() 方法;在满足条件时,调用 notify() 或 notify_all() 方法唤醒等待的线程
import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '-----任务开始-----')print(self.name, '事件A处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件B处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件C处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '事件D处理完毕, 等待worker响应...')self.cond.notify()self.cond.wait()print(self.name, '-----任务结束-----')class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:self.cond.wait()print(self.name, '事件A已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件B已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件C已响应, 请继续')self.cond.notify()self.cond.wait()print(self.name, '事件D已响应, 请继续')self.cond.notify()if __name__ == '__main__':master = Master('master', condition)worker = Worker('worker', condition)worker.start()time.sleep(1)master.start()
执行结果
master -----任务开始-----
master 事件A处理完毕, 等待worker响应...
worker 事件A已响应, 请继续
master 事件B处理完毕, 等待worker响应...
worker 事件B已响应, 请继续
master 事件C处理完毕, 等待worker响应...
worker 事件C已响应, 请继续
master 事件D处理完毕, 等待worker响应...
worker 事件D已响应, 请继续
master -----任务结束-----
import threading
import timecondition = threading.Condition()class Master(threading.Thread):"""主任务类,执行过后等待子任务响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '前置准备工作结束, 通知子任务开始任务...')time.sleep(1)self.cond.notify_all()class Worker(threading.Thread):"""子任务类,等待主任务通知并响应"""def __init__(self, name, condition):super().__init__(name=name)self.name = nameself.cond = conditiondef run(self):with self.cond:print(self.name, '准备就绪, 等待调度...')self.cond.wait()print(self.name, '接收到主任务通知, 开始执行任务')print(self.name, '任务A执行完成')print(self.name, '任务B执行完成')print(self.name, '任务C执行完成')if __name__ == '__main__':master = Master('master', condition)worker_a = Worker('worker-a', condition)worker_b = Worker('worker-b', condition)worker_c = Worker('worker-c', condition)worker_a.start()worker_b.start()worker_c.start()time.sleep(0.3)master.start()
worker-a 准备就绪, 等待调度...
worker-b 准备就绪, 等待调度...
worker-c 准备就绪, 等待调度...
master 前置准备工作结束, 通知子任务开始任务...
worker-a 接收到主任务通知, 开始执行任务
worker-a 任务A执行完成
worker-a 任务B执行完成
worker-a 任务C执行完成
worker-b 接收到主任务通知, 开始执行任务
worker-b 任务A执行完成
worker-b 任务B执行完成
worker-b 任务C执行完成
worker-c 接收到主任务通知, 开始执行任务
worker-c 任务A执行完成
worker-c 任务B执行完成
worker-c 任务C执行完成
Semaphore用于控制对共享资源的访问。semaphore维护一个内部计数器。该计数器可以通过 acquire() 和 release() 两个方法来增加和减少。当计数器为0时,acquire() 方法将会被阻塞,直到其他线程调用 release() 方法位置。
semaphore.acquire() 将会使计数器-1,当计数器为0则会阻塞当前线程
semaphore.release() 将会时计数器+1,以便有更多的资源去使用计数器
下面模拟10个任务运行的情况,同时运行三个线程,通过Semaphore进行控制线程数。可以发现,通过semaphore即可控制线程的worker
import threading
import timesemaphore = threading.Semaphore(3)def task():"""任务机"""with semaphore:print(time.strftime('%H:%M:%S'), threading.current_thread().name, '开始执行...')time.sleep(2)if __name__ == '__main__':for i in range(10):t = threading.Thread(target=task)t.start()
执行结果
18:12:05 Thread-1 开始执行...
18:12:05 Thread-2 开始执行...
18:12:05 Thread-3 开始执行...
18:12:07 Thread-4 开始执行...
18:12:07 Thread-5 开始执行...
18:12:07 Thread-6 开始执行...
18:12:09 Thread-7 开始执行...
18:12:09 Thread-9 开始执行...
18:12:09 Thread-8 开始执行...
18:12:11 Thread-10 开始执行...