领先的免费Web技术教程,涵盖HTML到ASP.NET

网站首页 > 知识剖析 正文

Python多进程:释放多核CPU的洪荒之力

nixiaole 2025-05-25 14:44:03 知识剖析 2 ℃

一、多进程 vs 多线程

在python编程领域,多进程多线程都是实现并发编程的重要手段,但它们有着本质区别。

多线程受限于 Python的全局解释器锁(GIL),同一时间只有一个线程能执行Python代码,在CPU密集型任务中难以发挥多核优势;

多进程则每个进程拥有独立的Python解释器和内存空间,能真正利用多核CPU并行执行任务 ,尤其适合处理 CPU密集型工作,如数据计算、图像处理,机器学习训练等。

二、Python多进程核心函数及案例

1. multiprocessing.Process()

功能:用于创建进程对象,是多进程编程的基础。和threading.Thread()类似,通过target参数指定进程要执行的函数,args参数传递函数所需的参数。每个进程都有独立的内存空间和执行环境。

案例:使用多进程同时计算两个数的平方。

import multiprocessing


def square(x):
    result = x * x
    print(f"{x} 的平方是 {result}")


if __name__ == '__main__':
    p1 = multiprocessing.Process(target=square, args=(5,))
    p2 = multiprocessing.Process(target=square, args=(7,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

    print(f"程序结束")

#结果
7 的平方是 49
5 的平方是 25
程序结束

解析:定义square函数用于计算平方。在if __name__ == '__main__':语句块中创建两个进程p1和p2,分别计算5和7的平方。使用start()方法启动进程,join()方法等待进程执行完毕再执行主进程。(跟threading模块中的功能一样)

还有其他函数如is_alive()检查进程是否存活,terminate()强制终止进程,name属性获取或设置进程的名称。

2. multiprocessing.Pool()

功能:multiprocessing.Pool(processes=n)进程池可以创建n个进程,用于处理多个任务,避免频繁创建和销毁进程带来的开销。multiprocessing.Pool提供了多个函数来方便地管理进程池和分配任务,以下是一些常用的函数:

map(func, iterable[, chunksize])

  • 功能:将可迭代对象iterable中的每个元素依次应用到函数func上,并返回结果列表。它会自动将任务分配给进程池中的进程并行执行。chunksize参数用于指定每个进程处理的元素数量,默认为None,此时系统会根据情况自动调整。
  • 案例:使用进程池计算多个数的平方。
import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=3) as pool:
        pool.map(square, numbers)

解析:定义square函数用于计算平方。通过multiprocessing.Pool(processes=3)创建包含3个进程的进程池,pool.map(square, numbers)将numbers列表中的每个元素依次传递给square函数进行计算

apply(func[, args[, kwds]])

  • 功能:在进程池中异步地执行函数func,并传递参数args和关键字参数kwds。它会阻塞直到函数执行完毕并返回结果。
  • 案例:使用进程池异步计算两个数的和。
import multiprocessing

def add(a, b):
    return a + b

if __name__ == '__main__':
    with multiprocessing.Pool(processes=2) as pool:
        pool.apply(add, args=(3, 5))

解析:定义add函数用于计算两数之和。通过multiprocessing.Pool(processes=2)创建包含 2 个进程的进程池,pool.apply(add, args=(3, 5))异步执行add函数,传入参数3和5,最后阻塞等待函数执行完毕并返回结果。

apply_async(func[, args[, kwds[, callback]]])

  • 功能:与apply类似,但它是异步执行的,不会阻塞当前进程。函数执行完毕后,会调用callback函数(如果提供),并将函数的返回值作为参数传递给callback。
  • 案例:使用进程池异步计算多个数的平方,并在计算完成后打印结果。
import multiprocessing

def square(x):
    return x * x

def print_result(result):
    print(f"计算结果: {result}")

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=3) as pool:
        for num in numbers:
            pool.apply_async(square, args=(num,), callback=print_result)
        pool.close()
        pool.join()

#结果
计算结果: 1
计算结果: 4
计算结果: 9
计算结果: 16
计算结果: 25

解析:定义square函数用于计算平方,print_result函数用于打印计算结果。通过multiprocessing.Pool(processes=3)创建包含3个进程的进程池,使用pool.apply_async异步执行square函数,传入每个数字作为参数,并指定print_result为回调函数。最后调用pool.close()关闭进程池,防止新的任务提交,pool.join()等待所有进程执行完毕。

starmap(func, iterable[, chunksize])

  • 功能:与map类似,但它接受的可迭代对象iterable中的每个元素是一个元组,元组中的元素会被解包后作为参数传递给函数func。
  • 案例:使用进程池计算多个矩形的面积。
import multiprocessing

def area(length, width):
		print(f"计算结果: {length * width}")

if __name__ == '__main__':
    rectangles = [(2, 3), (4, 5), (6, 7)]
    with multiprocessing.Pool(processes=2) as pool:
       pool.starmap(area, rectangles)

解析:定义area函数用于计算矩形面积。通过multiprocessing.Pool(processes=2)创建包含2个进程的进程池,pool.starmap(area, rectangles)将rectangles列表中的每个元组解包后作为参数传递给area函数进行计算,最后返回结果列表。

imap(func, iterable[, chunksize])

  • 功能:与map类似,但它返回一个迭代器,而不是立即返回结果列表。这在处理大量数据时可以节省内存,因为结果是按需生成的。
  • 案例:使用进程池计算多个数的立方,并通过迭代器逐步获取结果。
import multiprocessing

def cube(x):
    return x * x * x

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=3) as pool:
        result_iterator = pool.imap(cube, numbers)
        for result in result_iterator:
            print(result)

解析:定义cube函数用于计算立方。通过multiprocessing.Pool(processes=3)创建包含 3 个进程的进程池,pool.imap(cube, numbers)返回一个迭代器,通过迭代该迭代器逐步获取每个数字的立方结果并打印。

imap_unordered(func, iterable[, chunksize])

  • 功能:与imap类似,但它返回的迭代器中的结果顺序是不确定的,即结果可能以任意顺序返回。这在结果顺序不重要的情况下可以提高效率。
  • 案例:使用进程池计算多个数的平方,并以任意顺序获取结果。
import multiprocessing

def square(x):
    return x * x

if __name__ == '__main__':
    numbers = [1, 2, 3, 4, 5]
    with multiprocessing.Pool(processes=3) as pool:
        result_iterator = pool.imap_unordered(square, numbers)
        for result in result_iterator:
            print(result)

解析:定义square函数用于计算平方。通过multiprocessing.Pool(processes=3)创建包含 3 个进程的进程池,pool.imap_unordered(square, numbers)返回一个无序的迭代器,通过迭代该迭代器以任意顺序获取每个数字的平方结果并打印。

3. multiprocessing.Queue()

功能:用于在进程间安全地传递数据,是进程间通信的常用方式之一。它是线程和进程安全的,避免了数据竞争问题。

案例:一个进程向队列中放入数据,另一个进程从队列中取出数据。

import multiprocessing

def producer(queue):
    data = [10, 20, 30]
    for item in data:
        queue.put(item)
        print(f"发送数据: {item}")
    queue.put(None)  # 发送结束信号

def consumer(queue):
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费数据: {item}")

if __name__ == '__main__':
    queue = multiprocessing.Queue()
    p1 = multiprocessing.Process(target=producer, args=(queue,))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))

    p1.start()
    p2.start()

    p1.join()
    p2.join()

解析:producer函数向队列中放入数据,并在最后放入None作为结束信号;consumer函数从队列中取出数据,当取出None时停止消费。通过multiprocessing.Queue()创建队列实现进程间数据传递。

4. multiprocessing.Manager()

功能:创建一个管理对象,用于在不同进程间共享数据,如共享列表、字典等。Manager 会创建一个服务器进程来管理共享对象,其他进程通过代理访问这些对象。

案例:多个进程共同修改共享字典。

import multiprocessing

def update_dict(shared_dict, key, value):
    shared_dict[key] = value

if __name__ == '__main__':
    manager = multiprocessing.Manager()
    shared_dict = manager.dict()
    processes = []
    data = [("key1", "value1"), ("key2", "value2"), ("key3", "value3")]
    for key, value in data:
        p = multiprocessing.Process(target=update_dict, args=(shared_dict, key, value))
        processes.append(p)
        p.start()

    for p in processes:
        p.join()

    print(shared_dict)

解析:update_dict函数用于更新共享字典。通过multiprocessing.Manager()创建管理对象,manager.dict()创建共享字典。多个进程分别执行update_dict函数修改共享字典,最后打印共享字典的内容。

三、实用案例详解

1. 多进程处理 CPU 密集型任务 - 计算斐波那契数列

场景:计算大量数字的斐波那契数列,这是典型的 CPU 密集型任务,多进程能充分利用多核 CPU 加速计算。以下对比使用多进程和不使用多进程时花费的时间,来看下多进程到底是否提效

import multiprocessing
import time

def fibonacci(n):
    if n <= 0:
        return []
    elif n == 1:
        return [0]
    elif n == 2:
        return [0, 1]
    result = [0, 1]
    while len(result) < n:
        result.append(result[-1] + result[-2])
    return result


if __name__ == '__main__':
    numbers = [10, 20, 30, 40, 50]

    #使用多进程
    starttime = time.time()
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.map(fibonacci, numbers)
    for num, fib_seq in zip(numbers, results):
        print(f"斐波那契数列(长度 {num}): {fib_seq}")
    print(f"多进程共花费{time.time() - starttime}s")


    #不使用多进程
    starttime = time.time()
    for  i   in  numbers:
        print(f"斐波那契数列(长度 {fibonacci(i)}")
    print(f"非多进程共花费{time.time() - starttime}s")

#执行结果: 显示多进程提升了10倍多
多进程共花费0.48859405517578125s
非多进程共花费5.2928924560546875e-05s

其他一些场景:比如需要对大量文件进行读取、分析和处理,如批量压缩文件、提取文件特定信息等。比如配合pandas处理大规模数据集,同时进行数据清洗(如去除缺失值、异常值)和统计计算(如求和、平均值)。

四、闭坑指南

忘记if __name__ == '__main__':保护语句

务必使用if __name__ == '__main__':语句,防止多进程启动时会递归创建进程,导致程序出错或无限循环。

进程间数据共享混乱

避免直接在多个进程中修改共享数据,使用合适的共享机制(如multiprocessing.Manager())或者使用multiprocessing.Queue()进行进程间安全的通信传递数据。

进程池大小设置不合理

根据任务类型、数据量和系统的 CPU 核心数合理设置进程池大小。一般可设置为CPU核心数的 1-2倍,通过测试找到最优值。避免进程数量设置过多,超过系统资源承载能力,导致系统性能下降甚至崩溃,而设置过少则无法充分利用多核优势。

忽略进程间通信开销

不要频繁在进程间传递大量数据,忽略通信过程中的序列化和反序列化开销,影响程序性能。

五、总结

Python 多进程是释放多核 CPU 性能的强大武器,尤其适用于 CPU 密集型任务。通过掌握multiprocessing模块的核心函数,结合实际案例和闭坑指南,我们可以轻松驾驭多进程编程,让程序在多核环境下高效运行。无论是数据计算、文件处理还是复杂的数据分析,多进程都能为你开辟一条快速通道!

最近发表
标签列表