5

一种有界队列(Bounded Buffer)的实现

 1 year ago
source link: http://blog.tubumu.com/2022/12/04/data-structure-an-implement-of-bounded-queue/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

一种有界队列(Bounded Buffer)的实现

2022-12-042022-12-05Data Structure

在有 CPUGPU 参与的一种运算中,比如深度学习推理,CPU 需要预处理数据,然后交给 GPU 处理,最后 CPU 对 GPU 的运算结果进行后处理
在整个过程中都是 FIFO,即数据 ABC 按顺序输入,也需要按 A’B’C’ 顺序输出。
如果采用同步阻塞的方式,在 CPU 预处理时 GPU 处于空闲状态,GPU 运算时 CPU 后处理处于空闲状态并且也不能进行后续数据的预处理。这样影响整体的吞吐。
期望是 GPU 运算时,CPU 可以同时进行数据预处理和后处理。这是典型的单生产者单消费者模式。

在两个线程之间传递数据时,为确保线程安全,可以在一个线程每次 mallocnew 申请内存,在另一个线程 freedelete。为了避免频繁的内存分配和释放,需要使用到内存池。

本文描述采用有界队列实现内存池,适用场景和限制:

  1. 需要把内存使用控制在一定范围内;
  2. 整个过程不允许丢弃数据;
  3. 生产和消费之间线程安全;
  4. 不会(也不允许)同时生产,不会(也不允许)同时消费。如果确实要多线程生产或多线程消费,调用代码自行确保线程安全。
// File: bounded_buffer.h
#pragma once
#include <cstddef>
#include <functional>
#include <mutex>
#include <string>
#include <thread>

/*
* @Description: BoundedBuffer。Produce 和 Consume 方法不是线程安全的。使用不同线程或确保线程安全地调用 Produce 和 Consume 方法。
*/
class BoundedBuffer
{
public:
BoundedBuffer(const std::string& name, size_t buffers_capacity_, size_t buffer_size_max);
~BoundedBuffer();
BoundedBuffer(const BoundedBuffer& rhs) = delete;
BoundedBuffer& operator=(const BoundedBuffer& rhs) = delete;

public:
/**
* @description: 生产。非线程安全,两个及以上线程调用 Produce 可能会导致脏写。
* @param {function<void(void*)>} func
* @return {void}
*/
void Produce(std::function<void(void*)> func);

/**
* @description: 消费。非线程安全,两个及以上线程调用 Consume 可能会导致读取到同一份数据。
* @param {function<void(void*)>} func
* @return {void}
*/
void Consume(std::function<void(void*)> func);

private:
std::string _name;

// 内存池
void** _buffers;
// 内存池容量
size_t _buffers_capacity;
// 内存块最大长度
size_t _buffer_size_max;
// 保护内存池
std::mutex _buffers_mtx;
// 内存池是否有可用的 slot (非满则可以写数据)
std::condition_variable _buffers_not_full_cond;
// 内存池是否非空 (非空则可以读数据)
std::condition_variable _buffers_not_empty_cond;
// 内存池将会读取的位置
size_t _buffers_read_position;
// 内存池当前可写入的位置
size_t _buffers_write_position;
};
// File: bounded_buffer.cpp
#include "bounded_buffer.h"
#include <assert.h>

BoundedBuffer::BoundedBuffer(const std::string& name, size_t buffers_capacity, size_t buffer_size_max)
: _name(name), _buffers_capacity(buffers_capacity), _buffer_size_max(buffer_size_max)
{
assert(buffers_capacity > 1);
assert(buffer_size_max > 0);
_buffers = static_cast<void**>(std::malloc(sizeof(void*) * buffers_capacity));
std::memset(_buffers, 0, sizeof(void*) * buffers_capacity);
}

BoundedBuffer::~BoundedBuffer()
{
for (auto i = 0; i < _buffers_capacity; i++)
{
if (_buffers[i])
{
std::free(_buffers[i]);
_buffers[i] = nullptr;
}
}
std::free(_buffers);
_buffers = nullptr;
}

void BoundedBuffer::Produce(std::function<void(void*)> func)
{
std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
// 等待可写 slot。要确保本次写入后,下次有写入位置,所以 +1。
_buffers_not_full_cond.wait(buffers_lock, [&] { return ((_buffers_write_position + 1) % _buffers_capacity) != _buffers_read_position; });
// 有可写 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Consume 造成有可读 slot 而无法读。
buffers_lock.unlock();
if (!_buffers[_buffers_write_position])
{
_buffers[_buffers_write_position] = std::malloc(_buffer_size_max);
}
auto buffer = _buffers[_buffers_write_position];
func(buffer);
// 更改写 slot
_buffers_write_position = (_buffers_write_position + 1) % _buffers_capacity;
_buffers_not_empty_cond.notify_one();
}

void BoundedBuffer::Consume(std::function<void(void*)> func)
{
std::unique_lock<std::mutex> buffers_lock(_buffers_mtx);
// 等待读 slot
_buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });
// 有可读 slot 马上释放。因为 func 可能是耗时操作,防止过久阻塞 Produce 造成有可写 slot 而无法写。
buffers_lock.unlock();
auto buffer = _buffers[_buffers_read_position];
func(buffer);
// 更改读 slot
_buffers_read_position = (_buffers_read_position + 1) % _buffers_capacity;
_buffers_not_full_cond.notify_one();
}

1、的确是需要 mutex 和 condition_variable 吗?

是的。比如在生产时,发现“无法获取到”可写的 slot,又不允许丢弃数据,为了不让生产者线程轮询则只能等待。

2、为什么 Produce 和 Consume 里 wait 返回后马上解锁?

比如生产时,生产的过程可能耗时。确保“能获取到”生产 slot 后立即解锁,以便消费者线程调用 Consume 时如果阻塞在 std::unique_lock<std::mutex> buffers_lock(_buffers_mtx); 能够取得锁,从而得以消费在本次生产之前已经生产好的 slot ——如果队列完全没有可读数据当然就“转为”阻塞在 _buffers_not_empty_cond.wait(buffers_lock, [&] { return _buffers_write_position != _buffers_read_position; });。如果是阻塞在 wait,则会在本次生产好后通过 _buffers_not_empty_cond.notify_one(); 唤醒消费者线程。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK