9

C++生产者与消费者多线程样例

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzUxMTk4MzY3MA%3D%3D&%3Bmid=2247484665&%3Bidx=1&%3Bsn=ee52ed3f5284d89862d2f2c7879303a2
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

点击上方蓝字可直接关注!方便下次阅读。如果对你有帮助,麻烦点个在看或点个赞,感谢~        文章首发   公众号—— Pou光明

先了解问题背景:

生产者消费者问题 (英语: Producer-consumer problem ),也称有限缓冲问题( Bounded-buffer problem ),是一个 多线程同步问题的经典案例。该问题描述了共享固定大小缓冲区 的两个线 —— 即所谓的 生产者 消费者 ”—— 在实际运行时会发生的问题。生产者的主要作用是生成一定量的数据放到缓冲区中,然后重复此过程。与此同时,消费者也在缓冲区消耗这些数据。该问题的关键就是要保证生产者不会在缓冲区满时加入数据,消费者也不会在缓冲区中空时消耗数据

要解决该问题,就必须让生产者在缓冲区满时休眠(要么干脆就放弃数据),等到下次消费者消耗缓冲区中的数据的时候,生产者才能被唤醒,开始往缓冲区添加数据。同样,也可以让消费者在缓冲区空时进入休眠,等到生产者往缓冲区添加数据之后,再唤醒消费者。通常采用线程间通信的方法解决该问题。如果解决方法不够完善,则容易出现死锁的情况。出现死锁时,两个线程都会陷入休眠,等待对方唤醒自己。该问题也能被推广到多个生产者和消费者的情形。

问题描述:生产者不断生产数据,每包数据有优先级及时间戳等属性,当队列满时,移除时间最迟的数据,并将新数据放置队列头。

通过锁与条件变量进行线程同步,下面通过代码进行说明。

一、数据结构

typedef struct stc_RcvInfo
{
int32_t priority; //优先级属性
int64_t u64InTime; //进入队列时间(us)
int32_t nBuffLen; //数据长度
int8_t acBuff[DATA_LEN]; //数据缓存
} tRcvInfo;

二、队列函数设计——入队、出队、锁、条件变量

1、  数据结构

typedef int PRIORITY;   //0 is highest level
typedef std::vector<std::shared_ptr<tRcvInfo>> MSG_QUEUE; //存放数据的容器


static std::mutex mtx; //本队列使用的锁
static std::condition_variable cv; //本队列与锁对应的条件变
//数据优先级与Vector关联的MAP
static std::map<PRIORITY, MSG_QUEUE> PriorityQueue{
{0, std::vector<std::shared_ptr<tRcvInfo>>{}},
{1, std::vector<std::shared_ptr<tRcvInfo>>{}},
{2, std::vector<std::shared_ptr<tRcvInfo>>{}},
{3, std::vector<std::shared_ptr<tRcvInfo>>{}},
{4, std::vector<std::shared_ptr<tRcvInfo>>{}},
{5, std::vector<std::shared_ptr<tRcvInfo>>{}},
{6, std::vector<std::shared_ptr<tRcvInfo>>{}},
{7, std::vector<std::shared_ptr<tRcvInfo>>{}}};


static int indexArray[] = {0, 1, 2, 3, 4, 5, 6, 7};
static int ArraySize = sizeof(indexArray) / sizeof(indexArray[0]);

2、 向队列中以一定规则插入数据

int PushDataIntoCommunicateQueue(std::shared_ptr<tRcvInfo> pdata)
{
{
std::unique_lock<std::mutex> lk(mtx); //上锁
try
{
//判断所在优先级对应的队列是否已满,队列大小通过宏来控制
if (PriorityQueue.at(pdata->priority).size() < COMMUNICATE_QUEUE_SIZE) //未满
{
PriorityQueue.at(pdata->priority).push_back(pdata); //按顺序入队
}
else //已满,删除第一个元素,即时间戳最晚元素 ,将数据放入第一个位置
{
PriorityQueue.at(pdata->priority).erase(PriorityQueue.at(pdata->priority).begin());
PriorityQueue.at(pdata->priority).insert(PriorityQueue.at(pdata->priority).begin(), pdata);
}


int nSta, nSize;
nSize = PriorityQueue.at(pdata->priority).size();
nSta = nSize >= COMMUNICATE_QUEUE_SIZE ? 1 : 0;
// if (nSta == 1) //输出当前队列大小
// {
// printf("Queue nSta = %d nSize = %d\n", nSta, nSize);
// std::cout << "thread id=" << std::this_thread::get_id() << std::endl;
//}
}
catch (const std::exception &e)
{
std::cerr << e.what() << std::endl;
return -1;
}
}
cv.notify_all(); //完成操作,进行唤醒


return 0;
}

3、 从队列中取数据

std::shared_ptr<tRcvInfo> GetDataFromCommunicateQueue()
{
std::unique_lock<std::mutex> lk(mtx); //上锁
//条件变量等待固定时间或谓词为真
cv.wait_for(lk, std::chrono::microseconds(5), []() {
for (auto &it : PriorityQueue)
{
if (!it.second.empty())
{
return true;
}
}
return false;
});


std::shared_ptr<tRcvInfo> pdata = NULL;


for (int i = 0; i < ArraySize; i++)
{
if (!PriorityQueue.at(indexArray[i]).empty())
{
pdata = PriorityQueue.at(indexArray[i]).front(); //从对应优先级中取出数据
PriorityQueue.at(indexArray[i]).erase(PriorityQueue.at(indexArray[i]).begin()); //
break;
}
}


return pdata;
}

三、函数调用

两个生产者,一个消费者。

1、生产者

static void *ProducerOne(void *pArg)
{
uint8_t u8testData[512] = {0};
memset(u8testData, 0x66, sizeof(u8testData));


int mark = *((int *)pArg);
printf("I am is %d thread child \n ", mark);


while (1)
{
std::shared_ptr<tRcvInfo> pRcvInfo = std::make_shared<tRcvInfo>();
if (NULL == pRcvInfo)
{
printf("pRcvInfo calloc error!\n");
break;
}


pRcvInfo->priority = 0; //构造数据
memcpy(pRcvInfo->acBuff, u8testData, sizeof(u8testData));
pRcvInfo->nBuffLen = sizeof(u8testData);
pRcvInfo->u64InTime = current_time_us_get();


PushDataIntoCommunicateQueue(pRcvInfo); //将数据入队,且无需延时
}


(void)pthread_exit(NULL);
}

2、消费者

static void *Consumer(void *pArg)
{
int mark = *((int *)pArg);
printf("I am is %d thread child \n ", mark);


while (1)
{
std::shared_ptr<tRcvInfo> pRcvInfo = GetDataFromCommunicateQueue(); //获取数据,无需延时
if (pRcvInfo == NULL)
{
continue;
}


printf("priority : %d \n", pRcvInfo->priority);
}


(void)pthread_exit(NULL);
}

四、小结

问题点一个是线程间同步问题——使用锁和条件变量

还有就是对谓词判断上——延时时间到,或者队列不为空谓词为真,程序皆可向下执行,跳出阻塞。

yIryquf.png!mobile

cpu 占用率

MZzeIjR.png!mobile

欢迎关注公众号—— Pou光明

完整程序可在公众号后台留言获取。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK