5

你的Python程序实际上可以并行使用多少个 CPU 内核?

 5 months ago
source link: https://www.techug.com/post/cpu-thread-pool-size/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在运行 CPU 密集型并行程序时,通常需要根据机器上 CPU 内核的数量来确定线程或进程池的大小。线程数量少了,就无法充分利用所有内核;线程数量多了,程序运行速度就会开始变慢,因为多个线程会争夺同一个内核。反正理论上是这样。

那么,如何检查计算机有多少个内核?这个建议真的好吗?

事实证明,要确定运行多少个线程非常棘手:

  • Python 标准库提供了多个 API 来获取这些信息,但都不够充分。
  • 更糟的是,由于指令级并行和同步线程(又称 Intel CPU 上的超线程)等 CPU 特性,您能有效使用的内核数量取决于您编写的代码!

让我们来看看为什么计算程序可以使用多少个 CPU 内核如此困难,然后再考虑可能的解决方案。

用 Python 获取 CPU 内核数

如果阅读 Python 标准库文档,它有一个 os.cpu_count() 函数,可以返回 “系统中逻辑 CPU 的数量”。逻辑是什么意思?我们稍后再讨论。

文档还告诉你,”len(os.sched_getaffinity(0)) 可以获取当前进程的调用线程被限制使用的逻辑 CPU 数量”。调度器亲和性是一种限制进程使用特定内核的方法。

遗憾的是,这种应用程序接口也不够充分。例如,在 Linux 上,用于实现 Docker 和其他容器系统的 cgroups API 有多种限制 CPU 使用的方法。在这里,我们将 CPU 限制为 2.25 个内核:

$ docker run -i -t --cpus=2.25 python:3.12-slim
Python 3.12.1 (main, Dec  9 2023, 00:21:37) [GCC 12.2.0] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>> import os
>>> os.cpu_count()
20
>>> len(os.sched_getaffinity(0))
20

实际上,我们只能使用 2.25 个 CPU,但两个应用程序接口都不知道这一点。

什么是逻辑 CPU?

操作系统选项只是问题的开端,在举例说明之前,我们需要了解什么是物理 CPU 内核和逻辑 CPU 内核。我的电脑使用的是英特尔 i7-12700K 处理器

  • 12 个物理内核(8 个高性能内核和 4 个性能较弱的内核)。
  • 20 个逻辑内核。

现代 CPU 内核可以并行执行多条指令。但是,如果 CPU 在等待从 RAM 中加载某些数据时卡住了,会发生什么情况呢?在此之前,它可能无法执行任何工作。

为了利用这些可能被浪费的资源,CPU 物理内核会在操作系统面前假装成多个内核。在我的 CPU 上,8 个速度更快的内核可以假装成两个内核,总共有 16 个逻辑内核。成对的逻辑内核将共享同一个物理内核。如果一个逻辑内核没有充分利用所有内部算术逻辑单元,例如因为它在等待内存加载,那么通过配对逻辑内核运行的代码仍可使用这些闲置资源。

这种技术被称为同步多线程技术,英特尔公司称之为超线程技术。如果你有一台个人电脑,通常可以在 BIOS 中禁用它。

现在我们又有了一个新问题。抛开调度器亲和性等因素不谈,我们应该使用物理内核数还是逻辑内核数作为线程池大小?

一个令人尴尬的并行例子

让我们来看看用 Numba 编译成机器代码的两个函数。我们确保释放 GIL 以实现并行性。

这两个函数做同样的事情,但其中一个比另一个快得多。我们可以在多个线程上并行运行这些函数,理论上可以线性提高吞吐量,直到内核耗尽为止。

from numba import njit
import numpy as np

@njit(nogil=True)
def slow_threshold(img, noise_threshold):
    noise_threshold = img.dtype.type(noise_threshold)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            result[i, j] = img[i, j] // 256
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            if result[i, j] < noise_threshold // 256:
                result[i, j] = 0
    return result

@njit(nogil=True)
def fast_threshold(img, noise_threshold):
    noise_threshold = np.uint8(noise_threshold // 256)
    result = np.empty(img.shape, dtype=np.uint8)
    for i in range(result.shape[0]):
        for j in range(result.shape[1]):
            value = img[i, j] >> 8
            value = (
                0 if value < noise_threshold else value
            )
            result[i, j] = value
    return result

我们将运行该函数处理图像,并测量其运行所需的时间:

rng = np.random.default_rng(12345)

def make_image(size=256):
    noise = rng.integers(0, high=1000, size=(size, size), dtype=np.uint16)
    signal = rng.integers(0, high=5000, size=(size, size), dtype=np.uint16)
    # A noisy, hard to predict image:
    return noise | signal

NOISY_IMAGE = make_image()
assert np.array_equal(
    slow_threshold(NOISY_IMAGE, 1000),
    fast_threshold(NOISY_IMAGE, 1000)
)

下面是在单核心上运行每个功能所需的时间:

%timeit slow_threshold(NOISY_IMAGE, 1000)
90.6 µs ± 77.7 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)
%timeit fast_threshold(NOISY_IMAGE, 1000)
24.6 µs ± 10.8 ns per loop (mean ± std. dev. of 7 runs, 10,000 loops each)

扩展到多个线程

现在我们有了几个函数,我们将设置一种方法,利用线程池处理给定的图像列表:

from multiprocessing.dummy import Pool as ThreadPool

def apply_in_thread_pool(
    num_threads, function, images
):
    with ThreadPool(num_threads) as pool:
        for image in images:
            result = pool.map(
                lambda img: function(img, 1000),
                images
            )
            assert len(result) == len(images)

接下来,我们将使用 benchit 库(也可以使用 perfplot,但要注意它是 GPL 许可的)绘制不同线程数运行不同函数所需的时间图:

import benchit
benchit.setparams(rep=1)

# 400 images to run through the pool:
IMAGES = [make_image() for _ in range(400)]

def slow_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, slow_threshold, IMAGES)

def fast_threshold_in_pool(num_threads):
    apply_in_thread_pool(num_threads, fast_threshold, IMAGES)

# Measure the two functions with 1 to 24 threads:
timings = benchit.timings(
    [slow_threshold_in_pool, fast_threshold_in_pool],
    range(1, 25),
    input_name="Number of threads"
)
timings.plot(logy=True, logx=False)



请注意运行时间是如何随着线程数的增加而缩短的……直到一个点。之后运行时间又开始变短。到目前为止,这正是我们所期望的。但也有出乎意料的地方:每个函数的最佳线程数都不同。

timings.to_dataframe().idxmin(axis="rows")
函数 最佳线程数
slow_threshold 19
fast_threshold 8

并行性的最佳效果也取决于你的代码

我们的慢速函数基本上可以利用所有逻辑内核。可能它没有充分利用特定物理内核的所有可用处理能力,因此逻辑内核允许更多并行性。

相比之下,我们速度更快的函数只能利用不超过 8 个内核;超过 8 个内核后,速度开始减慢。也许它开始遇到计算以外的瓶颈,比如内存带宽。

对于这两个函数来说,线程池的大小都不是最佳的。

另辟蹊径:实证测量

在获得最佳线程数方面,我们遇到了很多问题:

  1. 考虑到操作系统限制 CPU 使用的所有不同方式,很难获得准确的内核数量。
  2. 最佳并行程度(如线程数)取决于工作量。更优化的代码可能无法利用额外的逻辑内核。
  3. 内核数量并不是唯一的瓶颈。
  4. 额外的问题:如果你在云中运行,你使用的是 “vCPU”,不管这意味着什么。不同的实例可能有不同的 CPU 型号。

因此,这里有另一种方法:在运行时根据经验发现最佳线程数。在上面的例子中,我们测量了特定代码的最佳线程数。如果你有一个长期运行的数据处理任务,需要在多个线程中运行相同的代码一段时间,你也可以这样做。也就是说,你可以在开始时花一点时间,根据经验测算出最佳线程数,或许还可以使用一些启发式方法来补偿工作量。

就运行时而言,如果您使用的是经验测量法,那么您就不需要关心为什么特定数量的线程是最佳的。无论硬件、操作系统配置或云环境如何,您都将使用最佳并行效果。

本文文字及图片出自 How many CPU cores can you actually use in parallel?


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK