用ThreadPoolExecutor创建线程池

2020年9月24日 / 14次阅读 / Last Modified 2020年9月24日
多线程

线程池的概念不难,即保持一个程序创建的最大线程数量,给这些线程分配任务,不管有多少任务,线程最大数量不变。我们自己通过threading模块设计线程池是可以的,不过python标准库中有一个ThreadPoolExecutor模块,已经给我们设计好了一个线程池。

直接举例说明:

import concurrent.futures as cuf
import subprocess
import time
import re


def shell(cmd):
    """Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
    proc = subprocess.run(cmd, shell=True, capture_output=True)
    #if proc.returncode != 0:
    #    raise ChildProcessError(proc.stderr.decode())
    return proc.stdout.decode()


def ping_one_ip(ip):
        stdout = shell('ping -c 1 -W 1 %s' % ip)
        if re.search('\s0% packet loss', stdout):
                return ip
        else:
                return None


exe = cuf.ThreadPoolExecutor(max_workers=300)
all_task = [exe.submit(ping_one_ip, ('192.168.2.%d'%i)) for i in range(1,255)]

num = 0
for it in cuf.as_completed(all_task):
        if rt:=it.result():
                print(rt)
                num += 1

print('Done, %d'%num)

我们需要的ThreadPoolExecutor模块在concurrent.futures中,as cuf。

这段代码首先创建了一个max_workers=300的线程池(线程池执行对象exe),即最大线程数为300。如果不显示提供这个参数,ThreadPoolExecutor会自动根据当前cpu的核数来计算一个数字数来。

然后通过submit函数,向这个线程池提交任务,任务为ping_one_ip,即ping一个ip地址,一共254个任务。submit函数是非阻塞的。ping_one_ip函数成功返回ip地址,失败返回None。

submit会一下子完成,其返回的future对象,在all_task这个list中。对于future对象,future.done() 判断是否执行结束;future.cancel() 如果任务还未进入线程池执行,就cancel掉。

然后代码用 for it in cuf.as_completed(all_task) 来获取执行结束的线程。as_complete是一个生产器,all_task中每一个任务执行结束,它就将其yield出来。通过调用 it.result() 函数,可以获得线程的返回值(线程的执行体就是一个函数)。

以上代码运行效果如下:

$ python tpool.py
192.168.2.114
192.168.2.109
192.168.2.117
192.168.2.1
192.168.2.100
192.168.2.107
192.168.2.103
192.168.2.102
Done, 8

轻松就拿到了整个局域网所有在线的ip地址。

如果max_workers的数量比较小,比如30,那么以上这段代码,就会同时启动30个线程,相当于同时ping 30个ip地址,结束一个后,在自动启动其它ip的ping任务,直到254个ping任务全部执行完毕。这个过程就会慢一些,因为workers的数量变小了。

如果不使用线程池技术,用遍历的方式,一个ip一个ip地ping,会非常的缓慢!线程池非常适合用来设计IO密集型的并行任务。

代码还可以这样:

import concurrent.futures as cuf
import subprocess
import time
import re


def shell(cmd):
    """Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
    proc = subprocess.run(cmd, shell=True, capture_output=True)
    #if proc.returncode != 0:
    #    raise ChildProcessError(proc.stderr.decode())
    return proc.stdout.decode()


def ping_one_ip(ip):
        stdout = shell('ping -c 1 -W 1 %s' % ip)
        if re.search('\s0% packet loss', stdout):
                return ip
        else:
                return None


exe = cuf.ThreadPoolExecutor(max_workers=30000)

num = 0
for it in exe.map(ping_one_ip, ['192.168.2.%d'%i for i in range(1,255)]):
        if it:
                print(it)
                num += 1

print('Done, %d'%num)

这段代码,没有使用submit向线程池提交任务,而是直接用一个map成员函数(python内置的map函数)。此时map函数也是一个生成器,不过返回的直接是ping_one_ip的返回值(不是furture对象),因此直接将ip地址打印出来即可。此时还有个特点,map吐出来的值是有顺序的(as_completed没有顺序),执行结果如下:

$ python tpool.py
192.168.2.1
192.168.2.102
192.168.2.103
192.168.2.107
192.168.2.109
192.168.2.114
192.168.2.117
Done, 7

还有一个wait函数可以考虑使用:

import concurrent.futures as cuf
import subprocess
import time
import re


def shell(cmd):
    """Run a shell cmd, return stdout, or raise wth stderr if sth wrong."""
    proc = subprocess.run(cmd, shell=True, capture_output=True)
    #if proc.returncode != 0:
    #    raise ChildProcessError(proc.stderr.decode())
    return proc.stdout.decode()


def ping_one_ip(ip):
        stdout = shell('ping -c 1 -W 1 %s' % ip)
        if re.search('\s0% packet loss', stdout):
                return ip
        else:
                return None


exe = cuf.ThreadPoolExecutor(max_workers=30000)
all_task = [exe.submit(ping_one_ip, ('192.168.2.%d'%i)) for i in range(1,255)]
cuf.wait(all_task)

num = 0
for it in all_task:
        if rt := it.result():
                print(rt)
                num += 1

print('Done, %d'%num)

默认情况下,cuf.wait会等待线程池中所有线程都执行完成,然后才返回。这段代码的显示效果不太理想,很生硬,程序会在最后突然显示一大片。

wait函数可以等待三个条件,上例使用了默认条件 ALL_COMPLETED, 还有 FIRST_COMPLETED 和 FIRST_EXCEPTION。

关于ThreadPoolExecutor的更新信息,请参考python官方文档:https://docs.python.org/3/library/concurrent.futures.html

-- EOF --

本文链接:https://www.pynote.net/archives/2500

留言区

《用ThreadPoolExecutor创建线程池》有1条留言

电子邮件地址不会被公开。 必填项已用*标注

  • 麦新杰

    shell function is right, i should use try to detect failure. [回复]


前一篇:
后一篇:

More


©Copyright 麦新杰 Since 2019 Python笔记

go to top