Python如何为IO任务使用线程?使用线程池执行器或线程模块等不同的方法来创建和处理线程以加速 Python 中的 I/O 绑定任务,包含一些常见的Python IO任务使用线程示例。
在计算中,线程是要在程序中执行的一系列编程指令,在单个程序中运行的两个线程意味着它们是并发运行的(即,不是并行运行)。与进程不同,Python 中的线程不在单独的 CPU 内核上运行,它们共享内存空间并高效地读写相同的变量。
Python IO任务如何使用线程?线程就像迷你进程,实际上有人称它们为轻量级进程,因为线程可以存在于进程中并完成进程的工作。但实际上,它们有很大的不同,以下是 Python 中线程和进程的主要区别:
进程
- 一个进程可以包含一个或多个线程
- 进程有独立的内存空间
- 两个进程可以运行在不同的 CPU 内核上(这会导致通信问题,但 CPU 性能更好)
- 进程比线程有更多的开销(创建和销毁进程需要更多的时间)
- 运行多个进程只对 CPU 密集型任务有效
线程
- 线程共享相同的内存空间并且可以读写共享变量(当然需要同步)
- 单个 Python 程序中的两个线程不能同时执行
- 运行多线程只对I/O密集型任务有效
现在你可能想知道,如果线程不能同时运行,为什么我们需要使用它们?在我们回答这个问题之前,让我们看看为什么线程不能同时运行。
Python 的 GIL
Python IO任务如何使用线程?Python 开发人员中最具争议的话题,GIL 代表Global Interpreter Lock,它基本上是一种锁,可以防止两个线程在同一个 Python 解释器中同时执行,有些人不喜欢这样,而其他人则声称它不是'这是一个问题,因为有像Numpy这样的库通过在外部 C 代码中运行程序来绕过这个限制。
现在,为什么我们需要使用线程呢?好吧,Python 在等待 I/O 块解析时释放锁,因此如果你的 Python 代码向 API、磁盘中的数据库或从 Internet 下载请求,Python 不会给它机会甚至获取锁,因为这些类型的操作发生在 GIL 之外。简而言之,我们只能从 I/O 范围内的线程中受益。
Python IO任务使用线程示例:单线程
Python如何为IO任务使用线程?为了演示,下面的代码试图从网上下载一些文件(这是一个完美的I / O任务)相继进行而不使用线程(它需要请求要安装,只需运行PIP3安装要求):
import requests
from time import perf_counter
# read 1024 bytes every time
buffer_size = 1024
def download(url):
# download the body of response by chunk, not immediately
response = requests.get(url, stream=True)
# get the file name
filename = url.split("/")[-1]
with open(filename, "wb") as f:
for data in response.iter_content(buffer_size):
# write data read to the file
f.write(data)
if __name__ == "__main__":
urls = [
"https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
"https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
"https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
"https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
"https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
] * 5
t = perf_counter()
for url in urls:
download(url)
print(f"Time took: {perf_counter() - t:.2f}s")
执行后,你会注意到当前目录中出现了新图像,并且你将获得如下输出:
Time took: 13.76s
所以上面的代码非常简单,它遍历这些图像并一张一张地下载它们,大约需要 13.8 秒(取决于你的互联网连接),但无论如何,我们在这里浪费了很多时间,如果你需要性能,请考虑使用线程。
相关: 在 Python 中使用 Celery 的异步任务。
多线程
import requests
from concurrent.futures import ThreadPoolExecutor
from time import perf_counter
# number of threads to spawn
n_threads = 5
# read 1024 bytes every time
buffer_size = 1024
def download(url):
# download the body of response by chunk, not immediately
response = requests.get(url, stream=True)
# get the file name
filename = url.split("/")[-1]
with open(filename, "wb") as f:
for data in response.iter_content(buffer_size):
# write data read to the file
f.write(data)
if __name__ == "__main__":
urls = [
"https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
"https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
"https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
"https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
"https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
] * 5
t = perf_counter()
with ThreadPoolExecutor(max_workers=n_threads) as pool:
pool.map(download, urls)
print(f"Time took: {perf_counter() - t:.2f}s")
Python如何为IO任务使用线程?这里的代码稍有改动,我们现在使用来自concurrent.futures包的ThreadPoolExecutor类,它基本上创建了一个包含我们指定的线程数量的池,然后它使用池处理跨线程拆分urls列表.map()方法。
这是我持续了多长时间:
Time took: 3.85s
使用5 个线程大约快x3.6(至少对我而言),尝试调整在你的机器上生成的线程数,看看你是否可以进一步优化它。
现在这不是创建线程的唯一方法,你也可以使用带有队列的便捷线程模块,这是另一个等效代码:
import requests
from threading import Thread
from queue import Queue
# thread-safe queue initialization
q = Queue()
# number of threads to spawn
n_threads = 5
# read 1024 bytes every time
buffer_size = 1024
def download():
global q
while True:
# get the url from the queue
url = q.get()
# download the body of response by chunk, not immediately
response = requests.get(url, stream=True)
# get the file name
filename = url.split("/")[-1]
with open(filename, "wb") as f:
for data in response.iter_content(buffer_size):
# write data read to the file
f.write(data)
# we're done downloading the file
q.task_done()
if __name__ == "__main__":
urls = [
"https://cdn.pixabay.com/photo/2018/01/14/23/12/nature-3082832__340.jpg",
"https://cdn.pixabay.com/photo/2013/10/02/23/03/dawn-190055__340.jpg",
"https://cdn.pixabay.com/photo/2016/10/21/14/50/plouzane-1758197__340.jpg",
"https://cdn.pixabay.com/photo/2016/11/29/05/45/astronomy-1867616__340.jpg",
"https://cdn.pixabay.com/photo/2014/07/28/20/39/landscape-404072__340.jpg",
] * 5
# fill the queue with all the urls
for url in urls:
q.put(url)
# start the threads
for t in range(n_threads):
worker = Thread(target=download)
# daemon thread means a thread that will end when the main thread ends
worker.daemon = True
worker.start()
# wait until the queue is empty
q.join()
在上面的Python IO任务使用线程示例中,这也是一个不错的选择,我们在这里使用了一个同步队列,我们用我们想要下载的所有图像 URL 填充它,然后手动生成线程,每个线程都执行download()函数。
Python IO任务如何使用线程?正如你可能已经看到的,download()函数使用了一个永远不会结束的无限循环,我知道这是违反直觉的,但是当我们知道执行此函数的线程是守护线程时,这是有道理的意味着只要主线程结束它就会结束。
所以我们使用q.put()方法放置 item,q.get()获取 item 并消费它(在这种情况下,下载它),这是广泛讨论的生产者-消费者问题在计算机科学领域。
现在,如果两个线程同时执行q.get()方法(或q.put()),会发生什么?嗯,我之前说过这个队列是线程安全的(同步的),这意味着它在引擎盖下使用了一个锁来防止两个线程同时获取项目。
当我们完成下载该文件时,我们调用q.task_done()方法,该方法告诉队列对任务(该项目)的处理已完成。
回到主线程,我们创建线程并使用start()方法启动它们,之后,我们需要一种方法来阻塞主线程,直到所有线程完成,这正是q.join()所做的,它阻塞直到队列中的所有项目都被获取和处理。
结论
Python如何为IO任务使用线程?总而言之,首先,如果你不需要加速代码,则不应使用线程,也许你每月执行一次,因此你只会增加代码复杂性,这可能会导致调试阶段出现困难。
其次,如果你的代码是大量 CPU 任务,那么你也不应该使用线程,这是因为 GIL。如果你希望在多核上运行你的代码,那么你应该使用multiprocessing 模块,它提供类似的功能,但使用进程而不是线程。
第三,你应该只在 I/O 任务上使用线程,例如写入磁盘、等待网络资源等。
最后,当你有要处理的项目甚至在开始使用它们之前,你应该使用ThreadPoolExecutor。但是,如果你的项目不是预定义的并且在代码执行时被抓取(就像在Web Scraping 中通常的情况一样),那么请考虑使用带有线程模块的同步队列。