本文讨论了当使用Python中的multiprocessing模块时,数据共享和进程间消息传递/通信的概念。
在多处理中, 任何新创建的进程都将执行以下操作:
- 独立运行
- 有自己的记忆空间。
考虑下面的程序以了解此概念:
import multiprocessing
# empty list with global scope
result = []
def square_list(mylist):
"""
function to square a given list
"""
global result
# append squares of mylist to global list result
for num in mylist:
result.append(num * num)
# print global list result
print ( "Result(in process p1): {}" . format (result))
if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]
# creating new process
p1 = multiprocessing.Process(target = square_list, args = (mylist, ))
# starting process
p1.start()
# wait until process is finished
p1.join()
# print global result list
print ( "Result(in main program): {}" . format (result))
Result(in process p1): [1, 4, 9, 16]
Result(in main program): []
在上面的示例中, 我们尝试打印全局列表的内容结果在两个地方:
- 在square_list函数。因为这个函数是由进程p1调用的,所以只在进程p1的内存空间中改变结果列表。
- 在主程序中完成p1进程后。因为主程序是由另一个进程运行的,所以它的内存空间仍然包含空的结果列表。
下图显示了这个概念:
在流程之间共享数据
共享内存:多处理模块提供数组和值对象在进程之间共享数据。
- 数组:从分配的ctypes数组共享内存.
- 值:从分配的ctypes对象共享内存.
下面给出的是一个简单示例, 显示了Array和值用于在流程之间共享数据。
import multiprocessing
def square_list(mylist, result, square_sum):
"""
function to square a given list
"""
# append squares of mylist to result array
for idx, num in enumerate (mylist):
result[idx] = num * num
# square_sum value
square_sum.value = sum (result)
# print result Array
print ( "Result(in process p1): {}" . format (result[:]))
# print square_sum Value
print ( "Sum of squares(in process p1): {}" . format (square_sum.value))
if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]
# creating Array of int data type with space for 4 integers
result = multiprocessing.Array( 'i' , 4 )
# creating Value of int data type
square_sum = multiprocessing.Value( 'i' )
# creating new process
p1 = multiprocessing.Process(target = square_list, args = (mylist, result, square_sum))
# starting process
p1.start()
# wait until process is finished
p1.join()
# print result array
print ( "Result(in main program): {}" . format (result[:]))
# print square_sum Value
print ( "Sum of squares(in main program): {}" . format (square_sum.value))
Result(in process p1): [1, 4, 9, 16]
Sum of squares(in process p1): 30
Result(in main program): [1, 4, 9, 16]
Sum of squares(in main program): 30
让我们尝试逐行理解上面的代码:
首先,我们像这样创建一个数组result:
result = multiprocessing.Array('i', 4)
- 第一个参数是数据类型。 " i"代表整数, 而" d"代表浮点数据类型。
- 第二个参数是数组大小。在这里, 我们创建一个包含4个元素的数组。
同样, 我们创建一个值square_sum像这样:
square_sum = multiprocessing.Value('i')
在这里, 我们只需要指定数据类型。可以给该值一个初始值(例如10), 如下所示:
square_sum = multiprocessing.Value('i', 10)
其次,在创建Process对象时传递result和square_sum作为参数。
p1 = multiprocessing.Process(target=square_list, args=(mylist, result, square_sum))
通过指定数组元素的索引为result数组元素指定一个值。
for idx, num in enumerate(mylist):
result[idx] = num * num
square_sum通过使用它的值值属性:
square_sum.value = sum(result)
为了打印result数组元素,我们使用result[:]来打印完整的数组。
print("Result(in process p1): {}".format(result[:]))
square_sum的值简单地打印为:
print("Sum of squares(in process p1): {}".format(square_sum.value))
下图描述了进程如何共享数组和值对象:
服务器进程:每当python程序启动时,服务器进程也会启动。从那时起,每当需要一个新进程时,父进程就会连接到服务器,并请求它派生一个新进程。
服务器进程可以保存Python对象,并允许其他进程使用代理来操作它们。
multiprocessing模块提供了一个管理器类来控制服务器进程。因此,管理器提供了一种创建数据的方法,这些数据可以在不同的进程之间共享。
服务器进程管理器比使用共享内存对象更灵活, 因为它们可以支持任意对象类型, 例如列表, 字典, 队列, 值, 数组等。而且, 单个管理器可以由网络上不同计算机上的进程共享。 。但是, 它们比使用共享内存慢。
考虑下面给出的示例:
import multiprocessing
def print_records(records):
"""
function to print record(tuples) in records(list)
"""
for record in records:
print ( "Name: {0}\nScore: {1}\n" . format (record[ 0 ], record[ 1 ]))
def insert_record(record, records):
"""
function to add a new record to records(list)
"""
records.append(record)
print ( "New record added!\n" )
if __name__ = = '__main__' :
with multiprocessing.Manager() as manager:
# creating a list in server process memory
records = manager. list ([( 'Sam' , 10 ), ( 'Adam' , 9 ), ( 'Kevin' , 9 )])
# new record to be inserted in records
new_record = ( 'Jeff' , 8 )
# creating new processes
p1 = multiprocessing.Process(target = insert_record, args = (new_record, records))
p2 = multiprocessing.Process(target = print_records, args = (records, ))
# running process p1 to insert new record
p1.start()
p1.join()
# running process p2 to print records
p2.start()
p2.join()
New record added!
Name: Sam
Score: 10
Name: Adam
Score: 9
Name: Kevin
Score: 9
Name: Jeff
Score: 8
让我们尝试理解以上代码:
首先,我们使用以下方法创建一个管理器对象:
with multiprocessing.Manager() as manager:
with语句块下面的所有行都在管理器对象的范围内。
然后,我们在服务器进程内存中创建一个列表记录:
records = manager.list([('Sam', 10), ('Adam', 9), ('Kevin', 9)])
类似地,您可以创建一个字典管manager.dict方法。
- 最后, 我们创建流程p1(在中插入新记录记录清单)和p2(打印记录)并在通过时运行它们记录作为参数之一。
服务器进程的概念如下图所示:
进程间通信
有效使用多个流程通常需要它们之间进行某种沟通, 以便可以划分工作并可以汇总结果。
multiprocessing支持两种类型的进程之间的通信通道::
- 队列Queue
- 管道Pipe
Queue队列:在进程与多处理之间进行通信的一种简单方法是使用队列来回传递消息。任何Python对象都可以通过Queue。
注意:multiprocessing.Queue类是queue.Queue的近似克隆。
考虑下面给出的示例程序:
import multiprocessing
def square_list(mylist, q):
"""
function to square a given list
"""
# append squares of mylist to queue
for num in mylist:
q.put(num * num)
def print_queue(q):
"""
function to print queue elements
"""
print ( "Queue elements:" )
while not q.empty():
print (q.get())
print ( "Queue is now empty!" )
if __name__ = = "__main__" :
# input list
mylist = [ 1 , 2 , 3 , 4 ]
# creating multiprocessing Queue
q = multiprocessing.Queue()
# creating new processes
p1 = multiprocessing.Process(target = square_list, args = (mylist, q))
p2 = multiprocessing.Process(target = print_queue, args = (q, ))
# running process p1 to square list
p1.start()
p1.join()
# running process p2 to get queue elements
p2.start()
p2.join()
Queue elements:
1
4
9
16
Queue is now empty!
让我们尝试逐步理解上面的代码:
- 首先, 我们创建一个多处理队列使用:
q = multiprocessing.Queue()
- 然后我们传递空队列qtosquare_list通过进程发挥作用p1。使用插入元素以排队放方法。
q.put(num * num)
- 为了打印队列元素, 我们使用得到直到队列不为空的方法。
while not q.empty(): print(q.get())
以下是描述队列中操作的简单图:
管道:一个管道只能有两个端点。因此,当只需要双向通信时,它比队列更可取。
multiprocessing模块提供了Pipe()函数,该函数返回由管道连接的一对连接对象。Pipe()返回的两个连接对象表示管道的两端。每个连接对象都有send()和recv()方法。
考虑下面给出的程序:
import multiprocessing
def sender(conn, msgs):
"""
function to send messages to other end of pipe
"""
for msg in msgs:
conn.send(msg)
print ( "Sent the message: {}" . format (msg))
conn.close()
def receiver(conn):
"""
function to print the messages received from other
end of pipe
"""
while 1 :
msg = conn.recv()
if msg = = "END" :
break
print ( "Received the message: {}" . format (msg))
if __name__ = = "__main__" :
# messages to be sent
msgs = [ "hello" , "hey" , "hru?" , "END" ]
# creating a pipe
parent_conn, child_conn = multiprocessing.Pipe()
# creating new processes
p1 = multiprocessing.Process(target = sender, args = (parent_conn, msgs))
p2 = multiprocessing.Process(target = receiver, args = (child_conn, ))
# running processes
p1.start()
p2.start()
# wait until processes finish
p1.join()
p2.join()
Sent the message: hello
Sent the message: hey
Sent the message: hru?
Received the message: hello
Sent the message: END
Received the message: hey
Received the message: hru?
让我们尝试理解上面的代码:
只需使用以下命令即可创建管道:
parent_conn, child_conn = multiprocessing.Pipe()
该函数为管道的两端返回了两个连接对象。
消息使用send方法从管道的一端发送到另一端。
conn.send(msg)
为了接收管道一端的任何消息,我们使用recv方法。
msg = conn.recv()
在上面的程序中, 我们从一端发送消息列表到另一端。在另一端, 我们阅读消息, 直到收到" END"消息。
考虑下面给出的图表, 该图表显示了黑白管和流程之间的关系:
注意:如果两个进程(或线程)试图同时从管道的同一端读取或写入管道的同一端, 则管道中的数据可能会损坏。当然, 不存在同时使用管道不同端的进程造成损坏的风险。还应注意, 队列在进程之间进行适当的同步, 但代价是更加复杂。因此, 据说队列是线程和进程安全的!
下一个:
- Python中进程的同步和池化
如果发现任何不正确的地方, 或者想分享有关上述主题的更多信息, 请写评论。
首先, 你的面试准备可通过以下方式增强你的数据结构概念:Python DS课程。