2020年9月24日 / 783次阅读 / Last Modified 2020年9月24日
多线程
线程池的概念不难,即保持一个程序创建的最大线程数量,给这些线程分配任务,不管有多少任务,线程最大数量不变。我们自己通过threading模块设计线程池是可以的,不过python标准库中有一个ThreadPoolExecutor模块,已经给我们设计好了一个线程池。
直接举例说明:
import concurrent.futures as cuf
import subprocess
import time
import re
def shell(cmd):
"""Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
proc = subprocess.run(cmd, shell=True, capture_output=True)
#if proc.returncode != 0:
# raise ChildProcessError(proc.stderr.decode())
return proc.stdout.decode()
def ping_one_ip(ip):
stdout = shell('ping -c 1 -W 1 %s' % ip)
if re.search('\s0% packet loss', stdout):
return ip
else:
return None
exe = cuf.ThreadPoolExecutor(max_workers=300)
all_task = [exe.submit(ping_one_ip, ('192.168.2.%d'%i)) for i in range(1,255)]
num = 0
for it in cuf.as_completed(all_task):
if rt:=it.result():
print(rt)
num += 1
print('Done, %d'%num)
我们需要的ThreadPoolExecutor模块在concurrent.futures中,as cuf。
这段代码首先创建了一个max_workers=300的线程池(线程池执行对象exe),即最大线程数为300。如果不显示提供这个参数,ThreadPoolExecutor会自动根据当前cpu的核数来计算一个数字数来。
然后通过submit函数,向这个线程池提交任务,任务为ping_one_ip,即ping一个ip地址,一共254个任务。submit函数是非阻塞的。ping_one_ip函数成功返回ip地址,失败返回None。
submit会一下子完成,其返回的future对象,在all_task这个list中。对于future对象,future.done() 判断是否执行结束;future.cancel() 如果任务还未进入线程池执行,就cancel掉。
然后代码用 for it in cuf.as_completed(all_task) 来获取执行结束的线程。as_complete是一个生产器,all_task中每一个任务执行结束,它就将其yield出来。通过调用 it.result() 函数,可以获得线程的返回值(线程的执行体就是一个函数)。
以上代码运行效果如下:
$ python tpool.py 192.168.2.114 192.168.2.109 192.168.2.117 192.168.2.1 192.168.2.100 192.168.2.107 192.168.2.103 192.168.2.102 Done, 8
轻松就拿到了整个局域网所有在线的ip地址。
如果max_workers的数量比较小,比如30,那么以上这段代码,就会同时启动30个线程,相当于同时ping 30个ip地址,结束一个后,在自动启动其它ip的ping任务,直到254个ping任务全部执行完毕。这个过程就会慢一些,因为workers的数量变小了。
如果不使用线程池技术,用遍历的方式,一个ip一个ip地ping,会非常的缓慢!线程池非常适合用来设计IO密集型的并行任务。
代码还可以这样:
import concurrent.futures as cuf
import subprocess
import time
import re
def shell(cmd):
"""Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
proc = subprocess.run(cmd, shell=True, capture_output=True)
#if proc.returncode != 0:
# raise ChildProcessError(proc.stderr.decode())
return proc.stdout.decode()
def ping_one_ip(ip):
stdout = shell('ping -c 1 -W 1 %s' % ip)
if re.search('\s0% packet loss', stdout):
return ip
else:
return None
exe = cuf.ThreadPoolExecutor(max_workers=30000)
num = 0
for it in exe.map(ping_one_ip, ['192.168.2.%d'%i for i in range(1,255)]):
if it:
print(it)
num += 1
print('Done, %d'%num)
这段代码,没有使用submit向线程池提交任务,而是直接用一个map成员函数(python内置的map函数)。此时map函数也是一个生成器,不过返回的直接是ping_one_ip的返回值(不是furture对象),因此直接将ip地址打印出来即可。此时还有个特点,map吐出来的值是有顺序的(as_completed没有顺序),执行结果如下:
$ python tpool.py 192.168.2.1 192.168.2.102 192.168.2.103 192.168.2.107 192.168.2.109 192.168.2.114 192.168.2.117 Done, 7
还有一个wait函数可以考虑使用:
import concurrent.futures as cuf
import subprocess
import time
import re
def shell(cmd):
"""Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
proc = subprocess.run(cmd, shell=True, capture_output=True)
#if proc.returncode != 0:
# raise ChildProcessError(proc.stderr.decode())
return proc.stdout.decode()
def ping_one_ip(ip):
stdout = shell('ping -c 1 -W 1 %s' % ip)
if re.search('\s0% packet loss', stdout):
return ip
else:
return None
exe = cuf.ThreadPoolExecutor(max_workers=30000)
all_task = [exe.submit(ping_one_ip, ('192.168.2.%d'%i)) for i in range(1,255)]
cuf.wait(all_task)
num = 0
for it in all_task:
if rt := it.result():
print(rt)
num += 1
print('Done, %d'%num)
默认情况下,cuf.wait会等待线程池中所有线程都执行完成,然后才返回。这段代码的显示效果不太理想,很生硬,程序会在最后突然显示一大片。
wait函数可以等待三个条件,上例使用了默认条件 ALL_COMPLETED, 还有 FIRST_COMPLETED 和 FIRST_EXCEPTION。
关于ThreadPoolExecutor的更新信息,请参考python官方文档:https://docs.python.org/3/library/concurrent.futures.html
-- EOF --
本文链接:https://www.pynote.net/archives/2500
《用ThreadPoolExecutor创建线程池》有1条留言
前一篇:python动态导入模块
后一篇:给tkinter程序添加左上角图标
©Copyright 麦新杰 Since 2019 Python笔记
shell function is right, i should use try to detect failure. [ ]