先决条件– Python中的多处理|
本文讨论了与Python中的多处理相关的两个重要概念:
- 进程之间的同步
- 进程合并
进程之间的同步
进程同步被定义为一种机制, 该机制可确保两个或多个并发进程不会同时执行某些特定的程序段, 即关键部分.
关键部分是指程序中访问共享资源的部分。
例如, 在下图中, 3个进程尝试同时访问共享资源或关键部分。
对共享资源的并发访问可能导致竞争条件。
当两个或多个进程可以访问共享数据并且它们试图同时更改它们时, 就会发生争用情况。结果, 变量的值可能是不可预测的, 并且取决于进程的上下文切换的时间而变化。
考虑下面的程序以了解竞争条件的概念:
# Python program to illustrate
# the concept of race condition
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance):
for _ in range ( 10000 ):
balance.value = balance.value - 1
# function to deposit to account
def deposit(balance):
for _ in range ( 10000 ):
balance.value = balance.value + 1
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value( 'i' , 100 )
# creating new processes
p1 = multiprocessing.Process(target = withdraw, args = (balance, ))
p2 = multiprocessing.Process(target = deposit, args = (balance, ))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print ( "Final balance = {}" . format (balance.value))
if __name__ = = "__main__" :
for _ in range ( 10 ):
# perform same transaction process 10 times
perform_transactions()
如果你在上述程序上运行, 你将获得一些非预期的值, 例如:
Final balance = 1311
Final balance = 199
Final balance = 558
Final balance = -2265
Final balance = 1371
Final balance = 1158
Final balance = -577
Final balance = -1300
Final balance = -341
Final balance = 157
在上述程序中, 以初始余额为100进行了10000次提款和10000次存款交易。预期的最终余额为100, 但我们在10次迭代中得到了perform_transactions函数是一些不同的值。
发生这种情况是由于进程同时访问共享数据平衡。余额的这种不可预测性不过是比赛条件.
让我们尝试使用下面给出的序列图更好地理解它。这些是在以上示例中针对单个撤回和存款操作可能产生的不同顺序。
这是一个可能的序列, 由于两个进程读取相同的值并相应地将其写回, 因此会给出错误的答案。
p1 | p2 | balance |
---|---|---|
read(balance) current= 100 |
100 | |
read(balance) current= 100 |
100 | |
balance=current-1 = 99 write(balance) |
99 | |
balance=current+ 1 = 101 write(balance) |
101 |
这些是以上场景中需要的2个可能的序列。
p1 | p2 | balance |
---|---|---|
read(balance) current= 100 |
100 | |
balance=current-1 = 99 write(balance) |
99 | |
read(balance) current= 99 |
99 | |
balance=current+ 1 = 100 write(balance) |
100 |
p1 | p2 | balance |
---|---|---|
read(balance) current= 100 |
100 | |
balance=current+ 1 = 101 write(balance) |
101 | |
read(balance) current= 101 |
101 | |
balance=current-1 = 100 write(balance) |
100 |
使用锁
多处理模块提供了锁上课以应对比赛条件。锁是使用信号操作系统提供的对象。
信号量是一个同步对象, 它控制多个进程在并行编程环境中对公共资源的访问。它只是操作系统(或内核)存储中指定位置的值, 每个进程可以检查然后更改。根据找到的值, 该进程可以使用该资源, 或者会发现该资源已在使用中, 并且必须等待一段时间才能再次尝试。信号量可以是二进制的(0或1), 也可以具有其他值。通常, 使用信号量的进程会检查该值, 然后(如果使用资源)则更改该值以反映该值, 以便后续的信号量用户将知道等待。
考虑下面给出的示例:
# Python program to illustrate
# the concept of locks
# in multiprocessing
import multiprocessing
# function to withdraw from account
def withdraw(balance, lock):
for _ in range ( 10000 ):
lock.acquire()
balance.value = balance.value - 1
lock.release()
# function to deposit to account
def deposit(balance, lock):
for _ in range ( 10000 ):
lock.acquire()
balance.value = balance.value + 1
lock.release()
def perform_transactions():
# initial balance (in shared memory)
balance = multiprocessing.Value( 'i' , 100 )
# creating a lock object
lock = multiprocessing.Lock()
# creating new processes
p1 = multiprocessing.Process(target = withdraw, args = (balance, lock))
p2 = multiprocessing.Process(target = deposit, args = (balance, lock))
# starting processes
p1.start()
p2.start()
# wait until processes are finished
p1.join()
p2.join()
# print final balance
print ( "Final balance = {}" . format (balance.value))
if __name__ = = "__main__" :
for _ in range ( 10 ):
# perform same transaction process 10 times
perform_transactions()
输出如下:
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
Final balance = 100
让我们尝试逐步理解上面的代码:
首先, 锁对象使用以下对象创建:
lock = multiprocessing.Lock()
然后,将lock作为目标函数参数传递:
p1 = multiprocessing.Process(target=withdraw, args=(balance, lock))
p2 = multiprocessing.Process(target=deposit, args=(balance, lock))
在目标函数的临界区,我们使用lock.acquire()方法应用锁。一旦获得了锁,其他进程就不能访问它的临界区,直到使用lock.release()方法释放锁。
lock.acquire()
balance.value = balance.value - 1
lock.release()
正如你在结果中看到的, 最终余额每次都是100(这是预期的最终结果)。
进程之间的池化
让我们考虑一个简单的程序来查找给定列表中的数字平方。
# Python program to find
# squares of numbers in a given list
def square(n):
return (n * n)
if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 , 5 ]
# empty list to store result
result = []
for num in mylist:
result.append(square(num))
print (result)
输出如下:
[1, 4, 9, 16, 25]
这是一个用于计算给定列表元素平方的简单程序。在多核/多处理器系统中, 请考虑下图以了解上述程序的工作方式:
只有一个内核用于程序执行, 其他内核很可能保持空闲状态。
为了利用所有核心, 多处理模块提供了泳池类。的泳池类表示工作进程池。它具有允许以几种不同方式将任务卸载到工作进程的方法。考虑下图:
在此, 任务由以下服务器自动在核心/进程之间卸载/分配:泳池目的。用户无需担心明确创建进程。
考虑下面给出的程序:
# Python program to understand
# the concept of pool
import multiprocessing
import os
def square(n):
print ( "Worker process id for {0}: {1}" . format (n, os.getpid()))
return (n * n)
if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 , 5 ]
# creating a pool object
p = multiprocessing.Pool()
# map list to target function
result = p. map (square, mylist)
print (result)
输出如下:
Worker process id for 2: 4152
Worker process id for 1: 4151
Worker process id for 4: 4151
Worker process id for 3: 4153
Worker process id for 5: 4152
[1, 4, 9, 16, 25]
让我们尝试逐步理解以上代码:
我们使用以下方法创建一个Pool对象:
p = multiprocessing.Pool()
对于获得更多的任务卸载控制,有一些参数。这些都是:
- processes:指定工作进程数。
- maxtasksperchild:指定每个孩子要分配的最大任务数。
可以使用以下参数使池中的所有进程执行一些初始化:
- initializer:指定工作进程的初始化函数。
- initargs:要传递给初始化程序的参数。
现在,为了完成某个任务,我们必须将它映射到某个函数。在上面的例子中,我们将mylist映射到square函数。这样mylist的内容和square的定义就会分布在核之间。
result = p.map(square, mylist)
所有工作进程完成任务后, 将返回带有最终结果的列表。
如果发现任何不正确的地方, 或者想分享有关上述主题的更多信息, 请写评论。
首先, 你的面试准备可通过以下方式增强你的数据结构概念:Python DS课程。