Skip to main content

多进程

创建进程

与创建线程类似,在 Python 中,可以使用 multiprocessing 模块来编写多进程程序。这个模块提供了丰富的 API,使得创建多进程应用变得相对简单。

创建一个新进程示例如下:

from multiprocessing import Process

def worker_function(number):
print(f"线程 {number} 正在工作。")

if __name__ == "__main__":
p = Process(target=worker_function, args=(1,))
p.start()
p.join()

在上面的示例中,我们定义了一个名为 worker_function 的函数,该函数接受一个参数 number 并打印一个关于工作的消息。这个函数将在子进程中运行。 p = Process(target=worker_function, args=(1,)) 这一行创建了一个新的进程对象。通过 target 参数,我们告诉它要运行的函数是 worker_function,并为这个函数提供了参数(1,)。 p.start() 告诉进程开始执行。此时,一个新的子进程将被创建并开始运行。join()方法会阻塞主进程,直到子进程执行完毕。这确保主进程不会在子进程完成之前结束。

进程池

创建进程,在进程间切换也是有额外开销的,而且这个额外开销比线程的开销要大。一般来说,电脑的 CPU 核心数量有限,创建 CPU 数量若多余 CPU 核心数量,必然无法做到每个进程都有自己独占的 CPU 核心。这样一来,再多的多的进程也不会对充分利用 CPU 再有任何帮助了,反而徒增进程切换的负担。所以,我们应该控制进程的总数量,以提高并行运算的效率。我们可以利用进程池(Process Pool)来控制控制并行执行的进程总数量,而不是为每一个任务都开启一个单独的进程。

创建

创建进程池时,可以指定要使用的进程数量。如果不指定,那么进程池默认使用系统上可用的所有核心。

from multiprocessing import Pool
pool = Pool(processes=4) # 创建一个包含4个进程的进程池

分发任务

为进程池分发任务,最常用的方法是使用进程池的 map 和 map_async 方法。这两个方法类似于 Python 的内置 map 函数,但它们支持并行执行函数调用。

def square(x):
return x * x

results = pool.map(square, [1, 2, 3, 4]) # 输出 [1, 4, 9, 16]

上面程序中的 pool.map() 方法会在多个进程中并行执行 square() 函数。

对于单个任务,可以使用 apply 和 apply_async 方法。apply 方法是阻塞的,而 apply_async 是非阻塞的并返回一个 AsyncResult 对象。

result = pool.apply(func, args=(arg1, arg2))  # 阻塞调用
async_result = pool.apply_async(func, args=(arg1, arg2)) # 非阻塞调用
result = async_result.get() # 从 AsyncResult 对象中获取结果

关闭

在完成所有任务后,需要关闭进程池,以防止提交更多的任务。可以使用 close 方法进行关闭。

pool.close()
pool.join() # 等待所有进程完成

如果需要立即停止进程池,可以使用 terminate 方法。这将立即终止所有进程,可能会导致未完成的任务丢失。

使用进程池中的进程时,每个进程都运行在其自己的内存空间中。这意味着它们之间不会共享状态或变量,这可以避免许多多线程编程中的并发问题。如果进程池中的任何任务引发异常,该异常将被传播到主进程,因此建议在使用进程池时进行适当的异常处理。

数据通信

每个进程都有自己独立的内存空间,所以一个进程不能直接去另一个进程里的变量中读取数据。在不同的进程间传递数据和通信,需要利用 multiprocessing 提供的一些工具。

管道

管道(Pipe)提供了一种双向或单向通信通道,两个进程可以通过它发送或接收数据。

from multiprocessing import Process, Pipe

def worker(conn):
conn.send("工作进程发送的数据")
conn.close()

parent_conn, child_conn = Pipe()
p = Process(target=worker, args=(child_conn,))
p.start()
print(parent_conn.recv()) # 输出: "工作进程发送的数据"
p.join()

Pipe()函数返回一个包含两个连接对象的元组:parent_conn 和 child_conn。这两个连接对象代表管道的两端。在这个例子中,主进程将使用 parent_conn 来接收消息,而工作进程将使用 child_conn 来发送消息。

队列

管道虽然使用简单,但它只能用于最简单的一对一通信方式,如果有更多的进程参与通信,或者数据产生量较大,需要一个结构来保存数据的,那么需要使用队列。

队列(Queue)是多进程安全的,可以用于进程之间的通信和数据交换。它允许多个进程放入数据,并从中取出数据。

from multiprocessing import Process, Queue

def worker(q):
q.put("工作进程发送的数据")

q = Queue()
p = Process(target=worker, args=(q,))
p.start()
print(q.get()) # 输出: "工作进程发送的数据"
p.join()

队列非常适合应用与多生产者和多消费者的场景。比如,一些进程负责生产数据:读取一个被测试设备的电压,信号波形等数据;另外几个进程负责消费数据:把数据显示到用户界面,存盘等。

Value 和 Array

当需要在进程之间共享一个单一的值或数组时,可以使用 Value 或 Array。这些对象使用共享内存来存储数据,因此可以由多个进程访问。

from multiprocessing import Process, Value, Array

def worker(val, arr):
val.value = 100
for i in range(len(arr)):
arr[i] = -arr[i]

val = Value('i', 0) # 'i' 表示整数
arr = Array('d', [1.0, 2.0, 3.0]) # 'd' 表示双精度浮点数
p = Process(target=worker, args=(val, arr))
p.start()
p.join()
print(val.value) # 输出: 100
print(arr[:]) # 输出: [-1.0, -2.0, -3.0]

管理器

管理器(Manager)提供了一种在进程之间共享更复杂的 Python 对象的方法,如列表、字典等。管理器启动了一个新的进程,其他进程通过代理与它通信。

from multiprocessing import Process, Manager

def worker(l, d):
l[1] = 100
d["b"] = "Updated"

with Manager() as manager:
l = manager.list([1, 2, 3])
d = manager.dict({"a": "Initial", "b": "Original"})

p = Process(target=worker, args=(l, d))
p.start()
p.join()

print(l) # 输出: [1, 100, 3]
print(d) # 输出: {'a': 'Initial', 'b': 'Updated'}

尽管使用共享对象(如Value或Array)进行进程间通信可能更直接,但它也可能带来竞态条件和数据不一致性的问题。为确保数据的一致性,可能需要使用同步机制,如互斥锁。

同步机制

我们可以使用与线程间同步机制非常类似的工具进行进程间的同步。因为两者极其相似,我们在这里就只做最简单的演示,详细解释可以参考在线程一节中的介绍。

互斥锁

互斥锁是最基本的同步机制,可以确保同一时刻只有一个进程可以访问共享的资源或代码段。

from multiprocessing import Process, Lock

def printer(item, lock):
with lock:
print(item)

lock = Lock()
for item in range(10):
Process(target=printer, args=(item, lock)).start()

信号量

与线程同步中的信号量概念相同。它允许维护一个计数,从而允许有限数量的进程访问资源。它通常用于限制对资源的并发访问数量。

from multiprocessing import Process, Semaphore

def worker(sem):
with sem:
# critical section of code
pass

sem = Semaphore(2) # Only 2 processes allowed at a time

条件变量

条件变量结合了锁和事件的功能。它允许一个或多个进程等待直到被另一个进程明确地通知继续执行。

from multiprocessing import Process, Condition

def worker(cond):
with cond:
# wait for notification
cond.wait()
# continue processing

cond = Condition()

事件

进程可以等待事件变为true,或者设置或清除事件标志。

from multiprocessing import Process, Event

def wait_for_event(e):
e.wait()
# code to execute after event is set

event = Event()

演示

与多线程编程相比,多进程编程的一个主要优点是每个进程都运行在自己的内存空间中,这避免了许多与并发相关的常见问题。但同时,数据的交换和共享也变得更为复杂。

为了利用上多 CPU,我们把在多线程中使用的示例改为多进程。

import time
from multiprocessing import Pool


def prime_factors(n):
"""Return a list of the prime factors for a natural number."""
factors = []

for i in range(2, int(n**0.5) + 1):
while n % i == 0:
factors.append(i)
n //= i

if n > 2:
factors.append(n)

return factors


def process_range(start, end):
for i in range(start, end):
prime_factors(i)


def main():
num_processes = 72
start_num = 1000000000
end_num = 1000010000
step = (end_num - start_num) // num_processes

chunks = [
(start_num + i * step, start_num + (i + 1) * step) for i in range(num_processes)
]

start_time = time.time()

with Pool(processes=num_processes) as pool:
pool.starmap(process_range, chunks)

end_time = time.time()
print(f"程序运行时间: {end_time - start_time:.6f} 秒")


if __name__ == "__main__":
main()

运行上面的程序,耗时 0.8 秒,速度提高了 25 倍。考虑到任务分配不均匀,额外的开销,测量的误差等因素,提高 25 倍已经非常好了。