微软C++并行库 pplx 的基本用法

 5 years ago
并行计算库充分利用多核的优势,通过并行运算提高程序效率,业界有两个知名的 c++ 并行库,一个是 intel 开发的 TBB ,一个是微软开发的 PPL

TBB(Intel® Threading Building Blocks )

TBBintel 用标准 c++ 写的一个开源的并行计算库。它的目的是提升数据并行计算的能力,可以在其 官网 下载最新的库和文档。 TBB 主要功能:并行算法、任务调度、并行容器、同步原语、内存分配器。

PPL(Parallel Patterns Library)

PPL 是微软开发的并行计算库,它的功能和 TBB 是差不多的,主要是在 windows 上使用。二者在并行算法的使用上基本上是一样的, 但还是有些差异: TBBtask 没有 PPLtask 强大, PPLtask 可以链式连续执行还可以组合任务, TBBtask 则不行。

PPL C++ 库与 C# 并行库 TaskParallelLibrary 的设计理念、基本框架以及接口使用上非常类似,熟悉 C# 并行库的朋友上手 C++ 版的 PPL 非常容易。下面我将介绍微软跨平台 PPL 的一个简易实现 pplx ,该库是附在微软的开源项目 cpprestsdk 中的。

pplx 并行库

C++ REST SDK 是 Microsoft 的一个开源跨平台项目, 其使用大量现代异步 C++ API 实现了一个基于 HTTP / HTTPS 协议的 B/S 组件,使用该组件,可以方便地进行高性能 RESTfulHTTP / HTTPS 服务器、客户端开发,且可以在 WindowsLinuxOSXiOSAndroid 各平台下使用。

当然今天我要介绍的主角是该项目中的并行库 PPLX 。下面先介绍如何编译安装 cpprestsdk ,然后介绍如何使用并行库 PPLX 。以下都是在 Ubuntu 系统上进行。


有两种方式可以安装 cpprestsdk ,一种是直接用 apt-get 安装,另一种是从源码安装。

通过 apt-get 安装

sudo apt-get install libcpprest-dev

从 source 编译安装

ref: How to build for Linux

1, 系统要求: Ubuntu 16.04 及之后的版本

2, 安装必要的工具:boost库,ninja 用于编译,

sudo apt-get install g++ git libwebsocketpp-dev openssl libssl-dev ninja-build

sudo apt-get install libboost-atomic-dev libboost-thread-dev libboost-system-dev libboost-date-time-dev libboost-regex-dev libboost-filesystem-dev libboost-random-dev libboost-chrono-dev libboost-serialization-dev

3, 下载代码

git clone https://github.com/Microsoft/cpprestsdk.git casablanca

4, 编译:

cd casablanca
mkdir build.release
cd build.release
cmake -G Ninja .. -DCMAKE_BUILD_TYPE=Release

如果想编译成 debug 版本,把上面代码中的 release/Release 修改为 debug/Debug 即可。

5, 编译完成之后,跑一下 test_runner 测试验证一下:

cd Release/Binaries
./test_runner *_test.so

或者运行 bing 搜索示例:

cd Release/Binaries
./BingRequest kesalin kesalin.html

6, 安装:

sudo ninja install
sudo ldconfig

7, 编译单个文件的参数:

g++ -std=c++11 my_file.cpp -o my_file -lboost_system -lcrypto -lssl -lcpprest

使用 pplx 并行库



auto task = pplx::task<int>([](){
    return 10;
auto task = []()->pplx::task<int>{
    return pplx::task_from_result(10);
auto task = pplx::create_task([](){
    return 10;
//create_task 创建延迟任务
pplx::task_completion_event<int> tce;// task_completion_event 需按值传递
auto task = pplx::create_task(tce);


pplx::task<std::string> create_print_task(const std::string& init_value)
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");

使用 task.get() 或者 task.wait() 执行任务:

  • 阻塞方式 get() : 阻塞直到任务执行完成,并返回任务结果,当任务取消时,抛出 task_canceled 异常,发生其它异常也会被抛出;
  • 非阻塞方式 wait() :等待任务到达终止状态,然后返回任务状态: completedcanceled ,如果发生异常会被抛出。
void test_task_chain()
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        std::cout << "Result value: " << value << std::endl;
        return value;

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;



  • when_all :返回组任务,只有当所有任务都完成时组任务才会返回成功;如果任一任务被取消或者抛出异常,则组任务会完成并处理取消状态,在组任务 get() 或者 wait() 时抛出异常。如果任务类型为 task<T> ,则组任务类型为 task<vector<T>>
  • when_any :返回组任务,当任一任务完成时组任务就会返回成功;如果所有任务都被取消或者抛出异常,则组任务会完成并处理取消状态,并且如果任何任务发生异常,在组任务 get 或者 wait 时抛出异常。如果任务类型为 task<T> ,则组任务类型为 task<T, size_t>size_t 返回完成任务的索引。
void test_group_tasks()
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {

        std::cout << info << std::endl;

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })

        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;

        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;


cancellation_token_source 通过封装一个 cancellation_token 指针来提供取消操作,通过 cancellation_token.is_canceled() 在执行任务的过程中判断任务是否要被取消。


void test_cancellation()
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   return true;


    std::cout << "Canceling task... " << now() << std::endl;

    std::cout << "Waiting for task to complete... " << now() << std::endl;

    std::cout << "Done. " << now() << std::endl;

当要在异步任务链中支持取消时,需要将 cancellation_token 作为构造 task 的参数传递,然后结合 task.wait() 判断是否要取消:

void test_cancellation_async()
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        else {
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;




task<T>.then([](T t){


task<T>.then([](task<T> task){
              task.get(); //使用get或者wait执行任务


void test_task_exception()
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;



// g++ -std=c++11 pplxdemo.cpp -o pplxdemo -lboost_system -lcrypto -lssl -lcpprest

#include <pplx/pplxtasks.h>
#include <iostream>
#include <sstream>
#include <vector>
#include <functional>
#include <iomanip>
#include <ctime>
#include <thread>
#include <chrono>
#include <stdexcept>

std::string now()
    auto t = std::time(nullptr);
    auto tm = *std::localtime(&t);

    std::ostringstream oss;
    oss << std::put_time(&tm, "%Y-%m-%d %H-%M-%S");
    auto str = oss.str();
    return str;

void sleep(int seconds)

pplx::task<std::string> create_print_task(const std::string& init_value)
    return pplx::create_task([init_value]() {
        std::cout <<"Current value:" << init_value << std::endl;
        return std::string("Value 2");

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 3");

    .then([](std::string value) {
        std::cout << "Current value:" << value << std::endl;
        return std::string("Value 4");

void test_task_chain()
    auto task_chain = create_print_task("Value 1");
    task_chain.then([](std::string value) {
        // uncomment this line to throw an exception.
        // throw std::runtime_error("An exception happened!");
        std::cout << "Result value: " << value << std::endl;
        return value;

    // process exception
    .then([](pplx::task<std::string> previousTask) {
        try {
        catch (const std::exception& e) {
            std::cout << "exception: " << e.what() << std::endl;


void test_group_tasks()
    auto sleep_print = [](int seconds, const std::string& info) {
        if (seconds > 0) {

        std::cout << info << std::endl;

    auto/*std::array<pplx::task<int>, 3>*/ tasks = {
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 1"); return 1; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(2, "Task 2"); return 2; }),
        pplx::create_task([sleep_print]() -> int { sleep_print(4, "Task 3"); return 3; })

        std::cout << "=== when_all ===" << std::endl;

        auto joinTask = pplx::when_all(std::begin(tasks), std::end(tasks));
        auto result = joinTask.wait();
        std::cout << "All joined thread. result: " << result << std::endl;

        std::cout << "=== when_any ===" << std::endl;

        auto joinTask = pplx::when_any(std::begin(tasks), std::end(tasks))
        .then([](std::pair<int, size_t> result) {
            std::cout << "First task to finish returns "
                  << result.first
                  << " and has index "
                  << result.second << std::endl;

        auto result = joinTask.wait();
        std::cout << "Any joined thread. result: " << result << std::endl;

void test_cancellation()
    pplx::cancellation_token_source cts;
    std::cout << "Creating task..." << std::endl;

    auto task = pplx::create_task([cts]{
        bool moreToDo = true;
        while (moreToDo) {
           if (cts.get_token().is_canceled()) {
           else {
               moreToDo = []()->bool {
                   std::cout << "Performing work at " << now() << std::endl;
                   return true;


    std::cout << "Canceling task... " << now() << std::endl;

    std::cout << "Waiting for task to complete... " << now() << std::endl;

    std::cout << "Done. " << now() << std::endl;

void test_cancellation_async()
    pplx::cancellation_token_source cts;
    auto task = pplx::task<void>([cts]() {
        std::cout << "Cancel continue_task" << std::endl;

    .then([]() {
        std::cout << "This will not run" << std::endl;
    }, cts.get_token());

    try {
        if (task.wait() == pplx::task_status::canceled) {
            std::cout<<"Taks has been canceled"<<std::endl;
        else {
    catch (const std::exception& e) {
        std::cout << "exception: " << e.what() << std::endl;

int main(int argc, char* args[])
    std::cout << "==== pplx demo ====" << std::endl;
    return 0;

