Coder Social home page Coder Social logo

codinghanya / workspace Goto Github PK

View Code? Open in Web Editor NEW
862.0 7.0 127.0 3.86 MB

workspace是基于C++11的轻量级异步执行框架,支持:通用任务异步并发执行、优先级任务调度、自适应动态线程池、高效静态线程池、异常处理机制等。

License: Apache License 2.0

C++ 93.77% CMake 6.23%
concurrency cpp11 easy-to-use high-performance stable threadpool framework sfinae concise

workspace's Introduction

workspace

workspace是基于C++11的轻量级异步执行框架,支持:通用任务异步执行、优先级任务调度、自适应动态线程池、高效静态线程池、异常处理机制等。

目录

特点

  • 轻量的:Header-Only & 代码量 <= 1000行 & 接口简单。
  • 高效的:超轻量级任务支持异步顺序执行,提高了框架的并发性能。
  • 灵活的:支持多种任务类型、动态线程调整、可通过workspace构建不同的池模型。
  • 稳定的:利用std::function的小任务优化减少内存碎片、拥有良好的异步线程异常处理机制。
  • 兼容性:纯C++11实现,跨平台,且兼容C++11以上版本。

主要模块

workbranch

workbranch(工作分支)是动态线程池的抽象,内置了一条线程安全的任务队列用于同步任务。其管理的每一条异步工作线程被称为worker,负责从任务队列不断获取任务并执行。(以下示例按顺序置于workspace/example/

让我们先简单地提交一点任务,当你的任务带有返回值时,workbranch会返回一个std::future,否则返回void。

#include <workspace/workspace.h>

int main() {
    // 2 threads
    wsp::workbranch br(2);
    // return void
    br.submit([]{ std::cout<<"hello world"<<std::endl; });  
    // return std::future<int>
    auto result = br.submit([]{ return 2023; });  
    std::cout<<"Got "<<result.get()<<std::endl;   
    // wait for tasks done (timeout: 1000 milliseconds)
    br.wait_tasks(1000); 
}

由于返回一个std::future会带来一定的开销,如果你不需要返回值并且希望程序跑得更快,那么你的任务应该是void()类型的。

当你有一个任务并且你希望它能尽快被执行时,你可以指定该任务的类型为urgent,如下:

#include <workspace/workspace.h>

int main() {
    // 1 threads
    wsp::workbranch br;
    br.submit<wsp::task::nor>([]{ std::cout<<"task B done\n";}); // normal task 
    br.submit<wsp::task::urg>([]{ std::cout<<"task A done\n";}); // urgent task
    br.wait_tasks(); // wait for tasks done (timeout: no limit)
}

在这里我们通过指定任务类型为wsp::task::urg,来提高任务的优先级。最终 在我的机器上:

jack@xxx:~/workspace/example/build$ ./e2
task A done
task B done

在这里我们不能保证task A一定会被先执行,因为当我们提交task A的时候,task B可能已经在执行中了。urgent标签可以让任务被插入到队列头部,但无法改变已经在执行的任务。

假如你有几个轻量异步任务,执行他们只需要非常短暂的时间。同时,按照顺序执行它们对你来说没有影响,甚至正中你下怀。那么你可以把任务类型指定为sequence,以便提交一个任务序列。这个任务序列会被单个线程顺序执行:

#include <workspace/workspace.h>

int main() {
    wsp::workbranch br;
    // sequence tasks
    br.submit<wsp::task::seq>([]{std::cout<<"task 1 done\n";},
                              []{std::cout<<"task 2 done\n";},
                              []{std::cout<<"task 3 done\n";},
                              []{std::cout<<"task 4 done\n";});
    // wait for tasks done (timeout: no limit)
    br.wait_tasks();
}

任务序列会被打包成一个较大的任务,以此来减轻框架同步任务的负担,提高整体的并发性能。

当任务中抛出了一个异常,workbranch有两种处理方式:A-将其捕获并输出到终端 B-将其捕获并通过std::future传递到主线程。第二种需要你提交一个带返回值的任务。

#include <workspace/workspace.h>
// self-defined
class excep: public std::exception {
    const char* err;
public:
    excep(const char* err): err(err) {}
    const char* what() const noexcept override {
        return err;
    }
}; 
int main() {
    wsp::workbranch wbr;
    wbr.submit([]{ throw std::logic_error("A logic error"); });     // log error
    wbr.submit([]{ throw std::runtime_error("A runtime error"); }); // log error
    wbr.submit([]{ throw excep("XXXX");});                          // log error

    auto future1 =  wbr.submit([]{ throw std::bad_alloc(); return 1; }); // catch error
    auto future2 =  wbr.submit([]{ throw excep("YYYY"); return 2; });    // catch error
    try {
        future1.get();
    } catch (std::exception& e) {
        std::cerr<<"Caught error: "<<e.what()<<std::endl;
    }
    try {
        future2.get();
    } catch (std::exception& e) {
        std::cerr<<"Caught error: "<<e.what()<<std::endl;
    }
}

在我的机器上:

jack@xxx:~/workspace/test/build$ ./test_exception 
workspace: worker[140509071521536] caught exception:
  what(): A logic error
workspace: worker[140509071521536] caught exception:
  what(): A runtime error
workspace: worker[140509071521536] caught exception:
  what(): XXXX
Caught error: std::bad_alloc
Caught error: YYYY

supervisor

supervisor是异步管理者线程的抽象,负责监控workbranch的负载情况并进行动态调整。它允许你在每一次调控workbranch之后执行一个小任务,你可以用来写日志或者做一些其它调控等。

每一个supervisor可以管理多个workbranch。此时workbranch之间共享supervisor的所有设定。

#include <workspace/workspace.h>

int main() {
    wsp::workbranch br1(2);
    wsp::workbranch br2(2);

    // 2 <= thread number <= 4 
    // time interval: 1000 ms 
    wsp::supervisor sp(2, 4, 1000);

    sp.set_tick_cb([&br1, &br2]{
        auto now = std::chrono::system_clock::now();
        std::time_t timestamp = std::chrono::system_clock::to_time_t(now);
        std::tm local_time = *std::localtime(&timestamp);
        static char buffer[40];
        std::strftime(buffer, sizeof(buffer), "%Y-%m-%d %H:%M:%S", &local_time);
        std::cout<<"["<<buffer<<"] "<<"br1: [workers] "<<br1.num_workers()<<" | [blocking-tasks] "<<br1.num_tasks()<<'\n';
        std::cout<<"["<<buffer<<"] "<<"br2: [workers] "<<br2.num_workers()<<" | [blocking-tasks] "<<br2.num_tasks()<<'\n';
    });

    sp.supervise(br1);  // start supervising
    sp.supervise(br2);  // start supervising

    for (int i = 0; i < 1000; ++i) {
        br1.submit([]{std::this_thread::sleep_for(std::chrono::milliseconds(10));});
        br2.submit([]{std::this_thread::sleep_for(std::chrono::milliseconds(20));});
    }

    br1.wait_tasks();
    br2.wait_tasks();
}

在我的机器上,输出如下:

jack@xxx:~/workspace/example/build$ ./e4
[2023-06-13 12:24:31] br1: [workers] 4 | [blocking-tasks] 606
[2023-06-13 12:24:31] br2: [workers] 4 | [blocking-tasks] 800
[2023-06-13 12:24:32] br1: [workers] 4 | [blocking-tasks] 213
[2023-06-13 12:24:32] br2: [workers] 4 | [blocking-tasks] 600
[2023-06-13 12:24:33] br1: [workers] 4 | [blocking-tasks] 0
[2023-06-13 12:24:33] br2: [workers] 4 | [blocking-tasks] 404
[2023-06-13 12:24:34] br1: [workers] 3 | [blocking-tasks] 0
[2023-06-13 12:24:34] br2: [workers] 4 | [blocking-tasks] 204
[2023-06-13 12:24:35] br1: [workers] 2 | [blocking-tasks] 0
[2023-06-13 12:24:35] br2: [workers] 4 | [blocking-tasks] 4
[2023-06-13 12:24:35] br1: [workers] 2 | [blocking-tasks] 0
[2023-06-13 12:24:35] br2: [workers] 4 | [blocking-tasks] 0

workspace

workspace是一个托管器/任务分发器,你可以将workbranch和supervisor托管给它,并用workspace分配的组件专属ID来访问它们。将组件托管至workspace至少有以下几点好处:

  • 堆内存正确释放:workspace在内部用unique指针来管理组件,确保没有内存泄漏
  • 分支间任务负载均衡:workspace支持任务分发,在workbranch之间实现了简单高效的负载均衡
  • 避免空悬指针问题:当workbranch先于supervisor析构会造成空悬指针的问题,使用workspace可以避免这种情况
  • 更低的框架开销:workspace的任务分发机制能减少与工作线程的竞争,提高性能(见下Benchmark)。

我们可以通过workspace自带的任务分发机制来异步执行任务(调用submit)。

#include <workspace/workspace.h>

int main() {
    wsp::workspace spc;
    auto bid1 = spc.attach(new wsp::workbranch);
    auto bid2 = spc.attach(new wsp::workbranch);
    auto sid1 = spc.attach(new wsp::supervisor(2, 4));
    auto sid2 = spc.attach(new wsp::supervisor(2, 4));
    spc[sid1].supervise(spc[bid1]);  // start supervising
    spc[sid2].supervise(spc[bid2]);  // start supervising

    // Automatic assignment
    spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});
    spc.submit([]{std::cout<<std::this_thread::get_id()<<" executed task"<<std::endl;});

    spc.for_each([](wsp::workbranch& each){each.wait_tasks();});
}

当我们需要等待任务执行完毕的时候,我们可以调用for_each+wait_tasks,并为每一个workbranch指定等待时间,单位是毫秒。

(更多详细接口见workspace/test/

辅助模块

futures

wsp::futures是一个std::future收集器(collector),可以缓存同类型的std::future,并进行批量操作。一个简单的操作如下:

#include <workspace/workspace.h>

int main() {
    wsp::futures<int> futures;
    wsp::workspace spc;
    spc.attach(new wsp::workbranch("br", 2));
    
    futures.add_back(spc.submit([]{return 1;}));
    futures.add_back(spc.submit([]{return 2;}));

    futures.wait();
    auto res = futures.get();
    for (auto& each: res) {
        std::cout<<"got "<<each<<std::endl;
    }
}

这里futures.get()返回的是一个std::vector<int>,里面保存了所有任务的返回值。

benchmark

空跑测试

测试原理:通过快速提交大量的空任务以考察框架同步任务的开销。
测试环境:Ubuntu20.04 : 16核 : AMD Ryzen 7 5800H with Radeon Graphics 3.20 GHz

<测试1>
在测试1中我们调用了submit<wsp::task::seq>,每次打包10个空任务并提交到workbranch中执行。结果如下:(代码见workspace/benchmark/bench1.cc

threads: 1  |  tasks: 100000000  |  time-cost: 2.68801 (s)
threads: 2  |  tasks: 100000000  |  time-cost: 3.53964 (s)
threads: 3  |  tasks: 100000000  |  time-cost: 3.99903 (s)
threads: 4  |  tasks: 100000000  |  time-cost: 5.26045 (s)
threads: 5  |  tasks: 100000000  |  time-cost: 6.65157 (s)
threads: 6  |  tasks: 100000000  |  time-cost: 8.40907 (s)
threads: 7  |  tasks: 100000000  |  time-cost: 10.5967 (s)
threads: 8  |  tasks: 100000000  |  time-cost: 13.2523 (s)

<测试2>
在测试2中我们同样将10个任务打成一包,但是是将任务提交到workspace中,利用workspace进行任务分发,且在workspace托管的workbranch只拥有 1条 线程。结果如下:(代码见workspace/benchmark/bench2.cc

threads: 1  |  tasks: 100000000  |  time-cost: 4.38221 (s)
threads: 2  |  tasks: 100000000  |  time-cost: 4.01103 (s)
threads: 3  |  tasks: 100000000  |  time-cost: 3.6797 (s)
threads: 4  |  tasks: 100000000  |  time-cost: 3.39314 (s)
threads: 5  |  tasks: 100000000  |  time-cost: 3.03324 (s)
threads: 6  |  tasks: 100000000  |  time-cost: 3.16079 (s)
threads: 7  |  tasks: 100000000  |  time-cost: 3.04612 (s)
threads: 8  |  tasks: 100000000  |  time-cost: 3.11893 (s)

<测试3>
在测试3中我们同样将10个任务打成一包,并且将任务提交到workspace中,但是workspace管理的每个workbranch中都拥有 2条 线程。结果如下:(代码见workspace/benchmark/bench3.cc

threads: 2  |  tasks: 100000000  |  time-cost: 4.53911 (s)
threads: 4  |  tasks: 100000000  |  time-cost: 7.0178 (s)
threads: 6  |  tasks: 100000000  |  time-cost: 6.00101 (s)
threads: 8  |  tasks: 100000000  |  time-cost: 5.97501 (s)
threads: 10 |  tasks: 100000000  |  time-cost: 5.63834 (s)
threads: 12 |  tasks: 100000000  |  time-cost: 5.17316 (s)

总结:利用workspace进行任务分发,且workbranch线程数为1的情况下,整个任务同步框架是静态的,任务同步开销最小。当workbranch内的线程数越多,面对大量空任务时对任务队列的竞争越激烈,框架开销越大。(更加详尽的测试结果见bench.md,测试代码于workspace/bench

如何使用

生成doxygen文档

请提前安装doxygen

# 与workspace同级目录中(Linux)
doxygen ./doxygen.conf

生成的文档在workspace/docs/中,可以在浏览器中打开workspace/docs/html/index.html并查看接口。

简单使用

# 项目代码与workspace同级(Linux)
g++ -I workspace/include xxx.cc -lpthread && ./a.out

其它平台可能需要链接不同的线程库,且可执行文件后缀不同。

运行已有实例(以example为例)

# 在"workspace/example"中
cmake -B build 
cd build
make
./e1

安装到系统(支持Win/Linux/Mac)

# 在"workspace/"中
cmake -B build 
cd build
sudo make install

注意事项

雷区

  1. 不要在任务中操纵组件,如:submit([&br]{br.wait_tasks();}); 会阻塞线程
  2. 不要在回调中操纵组件,如:set_tick_cb([&sp]{sp.suspend();});
  3. 不要让workbranch先于supervisor析构(空悬指针问题)。

接口安全性

组件接口 是否线程安全
workspace
workbranch
supervisor
futures

时间单位

workspace有关时间的接口单位都是 -> 毫秒(ms)

其它

参考书目

《C++并发编程》

旧版信息

由于历史原因,Hipe源码已不复存在,诸位请向前看吧。

联系我

邮箱: [email protected]

workspace's People

Contributors

codesire-deng avatar codinghanya avatar firma2021 avatar leon-rein avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar

workspace's Issues

关于任务窃取机制的疑问

github

up你好,我想问一下这里,就是任务窃取时候,这个代码里没有把成功时候的public_task_queue里边的任务转移到t里面呀?只更改了t的task_num,请问是我理解错了么?

线程返回超时问题

workbranch br(2);
auto result = br.submit([](){
    return xx;
});
if(result.wait_for(xx) == std::future::time_out){
    // 请问一下 如过超时,如何杀掉执行当前超时任务的线程。
}else{
    result.get();
}

请问一下 如过超时,如何杀掉执行当前超时任务的线程。

关于加速比

您好,我看到你的benchmark中提到了加速比,我主要是没感觉到这个指标的意义是在哪呢?
单线程4个task,200s
然后线程池4个task50s,这是很正常的。。

但是当任务达到8、16的时候,总耗时为啥会减少呢?我没有想到原因,能否解释一下啊

关于数据竞争的一些问题

用户的业务逻辑代码可能是千奇百怪、十分复杂的。有可能出现以下两种情况:

两个外部线程调用同一个线程池的waitForTasks()函数,此时会造成数据竞争,程序崩溃。
用户业务逻辑复杂的时候,我认为有可能需要这种用法。

用户向线程池提交了一个函数,这个函数中又去调用线程池的waitForTasks(),此时程序会永远阻塞。
这种用法或许有点儿愚蠢,,但至少您应该在文档中写明waitForTasks()不能提交给线程池执行。
用户代码量大的时候可能难以发现这个错误。线程池应该能检测出这种错误并提示用户。

#include "../hipe.h"

using namespace hipe;

void multithread_wait_task()
{
	DynamicThreadPond pond (8);
	
	pond.submit([](){std::cout << "hello world!" << std::endl;});
	
	std::thread trd { [&pond](){ pond.waitForTasks(); } };
	pond.waitForTasks();
}

void poolthread_wait_task()
{
	DynamicThreadPond pond (8);
	pond.submit([&pond](){pond.waitForTasks(); });
	pond.waitForTasks();
}

int main()
{
	multithread_wait_task();
	poolthread_wait_task();
}

if (enable_steal_tasks) 的意义

如果能执行到worker中的 if (enable_steal_tasks) 的代码块,意味着waiting=false,但这又意味着全部任务执行完了,这个时候从其他线程的工作队列拿任务的意义何在?不是全部任务已经没了吗?求懂哥解释

关于性能分析的咨询

您好,追随您在b站的视频找到这里。

想咨询一下,是否有测试结果,来对比使用 a batch of 的方式,将 task 从写入队列,转移到执行队列中,
比直接写入一个线程安全的队列,然后消费的效果会好。

如果好的话,会好多少?

关于submitForReturn函数是否能执行带参数的函数的问题

截屏2023-04-07 00 47 09

如题,可以看出这个函数是不是只实现了 执行不带参数的函数功能啊?因为在调用std::result_of和std::packaged_task的时候,模板里面都没加参数的类型。如果我理解错误的话,up方不方便给一个普通函数的例子啊?

请求一些意见

  1. 在workbranch可以自动调节至低cpu占用的情况下,supervisor还有没有存在的必要?
  2. 若supervisor被删去,那么wsp::workspace还有没有存在的必要?

关于跨平台编译的一些问题

我的编译器是msvc,使用cmake链接你所有头文件重新编译,其中发现了一些问题,其中uint数据类型会报错,size_t数据转换会出现警告;我想说一下,这些数据类型可以换成基本数据类型。

动态线程调整相关

线程池支持自动扩容和收缩吗?

设置一个最小线程数量和最大线程数量,请求多的时候自动扩容(但小于最大线程数量),请求少的时候可以释放掉多余资源(但不能过度优化,至少保留最少线程数量)

不知道这个框架支不支持扩展这个功能?

建议启用以下cmake功能

编译时打开clang-tidy和-fsanitize=检测选项
安装库到系统路径,用户可以直接include
整合doxygen,自动生成文档,打包和安装文档,之后用户可以通过man命令查看这个库的文档

[bug?] 可能不需要条件变量的互斥锁保护

重构问题

请问一下,这个仓库为什么要从hipe重构为workspace,以前的是有什么问题,现在的有什么优势吗?谢谢

请问线程池可以处理高递归的任务吗

假设原来有类似这样一个会产生很高递归栈的函数:

int fib(int n) {
  if (n <= 1) {
    return 1;
  }
  return fib(n-1) + fib(n-2);
}

把它改写用threadpool后,有类似这样的:

int fib(int n) {
  if (n <= 1) {
    return 1;
  }
  int a = pool.submit(fib(n-1));
  int b = pool.submit(fib(n-2));
  return a + b;
}

但是我有一个困惑是, 在计算fib(n-1)和fib(n-2)时 原来的fib(n)还是挂在线程池里的某一个worker身上没法被销毁, 也不能被销毁. 这样的话递归不了几层, 所有的线程worker都卡住了.

当然如果超出了现有的线程数量可以去新开数量, 但是面对高递归任务时, 开出来大几百个线程worker会很快使得系统性能下降.

请问有什么能保存当前的context但是又不卡住一个线程资源的使用方法吗

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.