Coder Social home page Coder Social logo

chunelfeng / cgraph Goto Github PK

View Code? Open in Web Editor NEW
1.4K 23.0 260.0 5.68 MB

【A common used C++ DAG framework】 一个通用的、无三方依赖的、跨平台的、收录于awesome-cpp的、基于流图的并行计算框架。欢迎star & fork

Home Page: http://www.chunel.cn

License: MIT License

CMake 1.08% C++ 92.33% C 5.70% Lua 0.23% Shell 0.66%
dag threadpool workflow taskflow cpp concurrency ai pipeline pytorch tensorflow

cgraph's Introduction

languages os stars forks

awesome-cpp HelloGithub GitHub-Chinese-Top-Charts

中文 | English Readme

CGraph 说明文档

CGraph is a cross-platform Directed Acyclic Graph framework based on pure C++ without any 3rd-party dependencies.

You, with it, can build your own operators simply, and describe any running schedules as you need, such as dependence, parallelling, aggregation and so on. Some useful tools and plugins are also provide to improve your project.

Tutorials and contact information are show as follows. Please get in touch with us for free if you need more about this repository.

一. 简介

CGraph中文名为【色丶图】,是一套无任何第三方依赖的跨平台图流程执行框架。通过GPipeline(流水线)底层调度,实现了依赖元素依次顺序执行、非依赖元素并发执行的调度功能。

使用者只需继承GNode(节点)类,实现子类的run()方法,并根据需要设定依赖关系,即可实现任务的图化执行或流水线执行。还可以通过设定各种包含多节点信息的GGroup(组),自行控制图的条件判断、循环和并发执行逻辑。

项目提供了丰富的Param(参数)类型,用于不同应用场景下的数据互通。此外,还可以通过添加GAspect(切面)的方式,实现以上各种元素功能的横向扩展;通过引入GAdapter(适配器)对单个节点功能进行加强;或者通过添加GEvent(信号),丰富和优化执行逻辑。

CGraph Skeleton

本工程使用纯C++11标准库编写,无任何第三方依赖。兼容MacOSLinuxWindowsAndroid系统,支持通过 CLionVSCodeXcodeVisual StudioCode::BlocksQt Creator等多款IDE进行本地编译和二次开发,具体编译方式请参考 CGraph 编译说明

详细功能介绍和用法,请参考 推荐阅读 中的文章内容。项目相关视频在B站持续更新中,欢迎观看交流和一键三连:

二. 使用Demo

MyNode.h

#include "CGraph.h"

class MyNode1 : public CGraph::GNode {
public:
    CStatus run() override {
        printf("[%s], Sleep for 1 second ...\n", this->getName().c_str());
        CGRAPH_SLEEP_SECOND(1)
        return CStatus();
    }
};

class MyNode2 : public CGraph::GNode {
public:
    CStatus run() override {
        printf("[%s], Sleep for 2 second ...\n", this->getName().c_str());
        CGRAPH_SLEEP_SECOND(2)
        return CStatus();
    }
};

main.cpp

#include "MyNode.h"

using namespace CGraph;

int main() {
    /* 创建一个流水线,用于设定和执行流图信息 */
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr a, b, c, d = nullptr;

    /* 注册节点之间的依赖关系 */
    pipeline->registerGElement<MyNode1>(&a, {}, "nodeA");
    pipeline->registerGElement<MyNode2>(&b, {a}, "nodeB");
    pipeline->registerGElement<MyNode1>(&c, {a}, "nodeC");
    pipeline->registerGElement<MyNode2>(&d, {b, c}, "nodeD");

    /* 执行流图框架 */
    pipeline->process();
    GPipelineFactory::remove(pipeline);

    return 0;
}

CGraph Demo
如上图所示,图结构执行的时候,首先执行a节点。a节点执行完毕后,并行执行bc节点。bc节点全部执行完毕后,再执行d节点。

三. 推荐阅读


四. 关联项目

  • GraphANNS : Graph-based Approximate Nearest Neighbor Search Working off CGraph
  • CThreadPool : 一个简单好用、功能强大、性能优异、跨平台的C++线程池
  • taskflow : A General-purpose Parallel and Heterogeneous Task Programming System
  • awesome-cpp : A curated list of awesome C++ (or C) frameworks, libraries, resources, and shiny things. Inspired by awesome-... stuff.
  • awesome-workflow-engines : A curated list of awesome open source workflow engines
  • nndeploy : nndeploy是一款模型端到端部署框架。以多端推理以及基于有向无环图模型部署为内核,致力为用户提供跨平台、简单易用、高性能的模型部署体验。
  • KuiperInfer : 带你从零实现一个高性能的深度学习推理库,支持大模型 llama2 、Unet、Yolov5、Resnet等模型的推理。Implement a high-performance deep learning inference library step by step

附录-1. 版本信息

[2021.05.04 - v1.0.0 - Chunel]

  • 提供图化执行功能,支持非依赖节点并行计算

[2021.05.09 - v1.1.0 - Chunel]

  • 优化图执行过程中的并发度

[2021.05.18 - v1.1.1 - Chunel]

  • 添加节点namesession信息

[2021.05.23 - v1.2.0 - Chunel]

  • 提供单节点循环执行功能

[2021.05.29 - v1.3.0 - Chunel]

  • 提供cluster(簇)和region(区域)划分和循环执行功能
  • 提供tutorial内容,包含多种使用样例

[2021.06.14 - v1.4.0 - Chunel]

  • 提供param(参数)传递机制
  • 提供group(组)功能,多节点模块统一继承自group模块
  • 添加对Linux系统的的支持

[2021.06.20 - v1.4.1 - Chunel]

  • 提供condition(条件)功能
  • 添加对Windows系统的支持

[2021.06.24 - v1.5.0 - Chunel]

  • 提供pipeline工厂创建方法
  • 更新tutorial内容

[2021.07.07 - v1.5.1 - Chunel]

  • 优化线程池功能。实现任务盗取机制

[2021.07.11 - v1.5.2 - Chunel]

  • 优化线程池功能。实现线程数量自动调节机制

[2021.07.31 - v1.5.3 - Chunel]

  • 优化线程池功能。实现任务批量获取功能,优化任务盗取机制

[2021.08.29 - v1.6.0 - Chunel]

  • 提供多pipeline功能,优化底层逻辑
  • 更新tutorial内容

[2021.09.19 - v1.6.1 - Chunel]

  • 提供Lru算子、Trie算子和模板节点功能,优化底层逻辑
  • 更新tutorial内容

[2021.09.29 - v1.7.0 - Chunel]

  • 提供aspect(切面)功能,用于横向扩展nodegroup功能
  • 更新tutorial内容

[2021.10.07 - v1.7.1 - Chunel]

  • 优化aspect(切面)实现逻辑,提供切面参数功能,提供批量添加切面功能
  • 更新tutorial内容

[2021.11.01 - v1.8.0 - Chunel]

  • 提供adapter(适配器)功能,提供singleton适配器功能
  • 优化pipeline执行逻辑
  • 更新tutorial内容

[2021.12.18 - v1.8.1 - Chunel]

  • 优化了返回值CStatus信息

[2022.01.02 - v1.8.2 - Chunel]

  • 提供节点执行超时自动退出功能,提供task group(任务组)功能
  • 提供线程池配置参数设置方法

[2022.01.23 - v1.8.3 - Chunel]

  • 提供function适配器,实现函数式编程功能
  • 提供线程优先级调度功能,提供线程绑定cpu执行功能
  • 更新tutorial内容

[2022.01.31 - v1.8.4 - Chunel]

  • 提供node(节点)异步执行的功能

[2022.02.03 - v1.8.5 - Chunel]

  • 提供daemon(守护)功能,用于定时执行非流图中任务
  • 更新tutorial内容

[2022.04.03 - v1.8.6 - Chunel]

  • 提供DistanceCalculator算子,用于实现任意数据类型、任意距离类型的计算
  • 更新tutorial内容

[2022.04.05 - v2.0.0 - Chunel]

  • 提供domain(领域)功能,提供Ann领域抽象模型,开始支持个别专业方向
  • 提供hold执行机制
  • 更新tutorial内容

[2022.05.01 - v2.0.1 - Chunel]

  • 优化pipeline注册机制,支持init方法自定义顺序执行
  • 提供一键编译脚本

[2022.05.29 - v2.1.0 - Chunel]

  • 提供element参数写入方法
  • 提供针对C++14版本的支持
  • 更新tutorial内容

[2022.10.03 - v2.1.1 - Chunel]

  • 提供线程池中的任务优先级机制
  • 优化group执行逻辑

[2022.11.03 - v2.2.0 - Chunel]

  • 提供message(消息)功能,主要用于完成不同pipeline之间的数据传递
  • 更新tutorial内容

[2022.12.24 - v2.2.1 - Chunel]

  • 提供TemplateNode(模板节点)功能,用于优化参数传参方式
  • 更新tutorial内容

[2022.12.25 - v2.2.2 - yeshenyong]

  • 优化图执行逻辑

[2022.12.30 - v2.2.3 - Chunel]

  • 提供message发布订阅功能
  • 提供执行引擎切换功能

[2023.01.21 - v2.3.0 - Chunel]

  • 提供event(事件)功能
  • 提供CGraph Intro.xmind文件,通过脑图的方式,介绍了CGraph的整体逻辑

[2023.01.25 - v2.3.1 - Chunel]

  • 提供针对C++11版本的支持。感谢 MirrorYuChen 提供相关解决方案

[2023.02.10 - v2.3.2 - Chunel]

  • 优化调度策略,提供调度参数配置接口
  • 提供英文版本readme.md

[2023.02.12 - v2.3.3 - yeshenyong, Chunel]

  • 提供graphviz可视化图展示功能
  • 提供参数链路追踪功能

[2023.02.22 - v2.3.4 - Chunel]

  • 优化Windows系统下调度机制
  • 优化param机制和event(事件)机制

[2023.03.25 - v2.4.0 - woodx, Chunel]

  • 提供可运行的docker环境,和构建docker环境的dockerfile文件
  • 提供pipeline调度资源管控机制
  • 优化调度性能

[2023.05.05 - v2.4.1 - Chunel]

  • 提供线程绑定执行功能
  • 提供pipeline最大并发度获取方法。感谢 Hanano-Yuuki 提供相关解决方案
  • 提供pipeline异步执行功能和执行时退出功能

[2023.06.17 - v2.4.2 - Chunel]

  • 提供MultiCondition(多条件)功能
  • 提供pipeline暂停执行和恢复执行功能

[2023.07.12 - v2.4.3 - Chunel]

  • 优化CStatus功能,添加了异常定位信息

[2023.09.05 - v2.5.0 - Chunel]

  • 提供perf功能,用于做pipeline的性能分析
  • 提供element的超时机制
  • 提供some(部分)功能,优化pipeline的异步执行方式

[2023.09.15 - v2.5.1 - Chunel]

  • 提供fence(栅栏)功能
  • 提供coordinator(协调)功能

[2023.11.06 - v2.5.2 - Chunel]

  • 优化message(消息)功能,可以设定写入阻塞时的处理方式,减少内存copy次数
  • 添加example相关内容,针对不同行业,提供一些简单实现
  • 优化调度性能

[2023.11.15 - v2.5.3 - Chunel]

  • 提供proto定义文件
  • 添加mutable(异变)功能,提供依赖关系注册语法糖

[2024.01.05 - v2.5.4 - Chunel]

  • 提供test内容,包含性能和功能方面的测试用例
  • 优化event(事件)机制,支持异步等待功能

[2024.01.24 - v2.6.0 - Chunel]

  • 提供pipeline的拓扑执行的方式
  • 提供判定element之间是否有依赖关系的方法

附录-2. 感谢

  • Thanks to the recommendation from awesome-cpp, we all know, it is the most authoritative recommendation list for cpp project in the world
  • Thanks to the recommendation from Taskflow Group: awesome-parallel-computing, and we always treat taskflow as a role model
  • Thanks to the recommendation from awesome-workflow-engines
  • 感谢各位开发者 CONTRIBUTORS 为项目做出的贡献
  • 感谢所有为CGraph项目提出的意见和建议的朋友,在此不一一提及。随时欢迎大家加入,一起共建

附录-3. 联系方式

cgraph's People

Contributors

atx735 avatar baoguo-ma1 avatar boysfight avatar chunel-mmt avatar chunelfeng avatar codesire-deng avatar codinghanya avatar greent2008 avatar logerrors avatar naonao-cola avatar ryanhuang avatar sgssgene avatar shiyiyuedeyu avatar woodx9 avatar yeshenyong avatar zmoth avatar zongxin1993 avatar zpye avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

cgraph's Issues

有关于动态addEParam

在接收webscoket数据时,使用了提前创建好的pipeline

void revcData(std::string data)
{
TestEPararm test;
test.data = data;
Element->addEparam(test)

pipline->init();
pipline->run();

}

大概是这样在接收数据的里面调用

发现这样调用我自己封装的EParam不会更新,只是反复调用而已

于是我把/GraphCtrl/GraphElement/GElement.inl

template<typename T,
c_enable_if_t<std::is_base_of<GElementParam, T>::value, int> >
GElementPtr GElement::addEParam(const std::string& key, T* param) {
//我把这行屏蔽了///CGRAPH_ASSERT_INIT_RETURN_NULL(false)
CGRAPH_ASSERT_NOT_NULL_RETURN_NULL(param)
if (local_params_.end() != local_params_.find(key)) {
// 如果重复同名key信息,则删除原先的内容
CGRAPH_DELETE_PTR(local_params_[key]);
}
T* cur = CGRAPH_SAFE_MALLOC_COBJECT(T);
cur->clone(param);

local_params_[key] = cur;    // 写入其中
return this;

}

屏蔽了之后,反复调用参数内容是正常更新的

我想通过同一个pipeline,更新参数反复调用,有没有好的建议

我屏蔽的那个是不是就破坏了什么?

pipeline->run();这个run起来之后,如果中途想停下来,怎么办?有没stop的接口?

如题,pipeline->run();这个run起来之后,如果中途想停下来,怎么办?有没stop的接口?
我的设想是:
1、想外部给一个全局变量g_bRunning,赋值为假,然后每个节点运行时,发现这个变量为假,那就停止下来。
2、比如,在我的ui界面按下停止按钮,然后赋值全局变量g_bRunning=false。问题来了,那这个全局变量g_bRunning怎么传入pipeline?

能否写个例程T19,专题来应对这个外部触发stop的应用场景?

请教一下线程池的测试

你好,想测一下线程池的性能,想使用一下线程池,是不是 也先把整个CGraph make下来就行啊,或者说 我用您的那个镜像进去测试,直接include线程池相关就可以进行测试了嘛

资源互斥场景

您好,并行计算有时会出现一些需要互斥锁的场景,例如多个并行Node之间共享了一个优先队列。此时朴素的做法是给数据结构加互斥锁。针对这种需求,在 CGraph 中,请问:

  1. 能否通过设计更好的 Graph 来规避加锁;若能,这种设计能否通用?
  2. 如果必须要加互斥锁,CGraph 有没有对应的工具;是否可以使用std::mutex

谢谢!

关于node节点的beforeRun()和afterRun()

先看源代码
1、main.cpp源码如下:
void test_opencv()
{
GPipelinePtr pipeline = GPipelineFactory::create();
CStatus status;
GElementPtr a = nullptr;
GElementPtr b = nullptr;

status += pipeline->registerGElement<CNodeReadImage>(&a, {}, "readNodeA");
// status += pipeline->registerGElement<CNodeReadImage>(&b, {a}, "readNodeB");

if (!status.isOK())
{
    return;
}

status += pipeline->init();
status += pipeline->run();
status += pipeline->destroy();
GPipelineFactory::remove(pipeline);

}

int main(int argc, char *argv[])
{
test_opencv();
return 0;
}

2、nodereadimage.cpp源文件如下
CStatus CNodeReadImage::init()
{
CGraph::CGRAPH_ECHO("[%s], enter init function", this->getName().c_str());
return CStatus();
}

CStatus CNodeReadImage::run()
{
CStatus status;
CGraph::CGRAPH_ECHO("[%s], enter run function", this->getName().c_str());
// CGRAPH_SLEEP_SECOND(1)
return status;
}

CStatus CNodeReadImage::destroy()
{
CGraph::CGRAPH_ECHO("[%s], enter destroy function", this->getName().c_str());
return CStatus();
}

CStatus CNodeReadImage::beforeRun()
{
CGraph::CGRAPH_ECHO("[%s], enter beforeRun function", this->getName().c_str());
return CStatus();
}

CStatus CNodeReadImage::afterRun()
{
CGraph::CGRAPH_ECHO("[%s], enter afterRun function", this->getName().c_str());
return CStatus();
}

再看编译之后的运行结果:
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter beforeRun function
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter afterRun function
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter init function
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter beforeRun function
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter run function
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter afterRun function
[CGraph] [Mon Feb 20 19:06:06 2023] ..\cvGraphDemo\graph\GraphCtrl\GraphPipeline\GPipeline.cpp | CGraph::GPipeline::run | line = [75], errorCode = [-1], errorInfo = [pipeline done status check failed...].
[CGraph] [Mon Feb 20 19:06:06 2023] [readNodeA], enter destroy function

我的疑问如下:
1、node节点在init之前,怎么先执行了beforeRun和afterRun?
2、为什么会报错?errorCode = [-1], errorInfo = [pipeline done status check failed...]
哪里有问题?

关于UAtomicQueue的问题

void waitPop(T& value) { CGRAPH_LOCK_GUARD lk(mutex_); cv_.wait(lk, [this] { return !queue_.empty(); }); value = std::move(*queue_.front()); queue_.pop(); }
您好,我是cpp并发刚入门的同学,关于您代码里的"条件变量等待条件",我在《c++并发编程实战》中看到应该用unique_lock,因为wait条件不满足时需要解锁进入阻塞状态,当wait条件满足时还需要保持锁状态直到函数执行完成。书里讲lock_guard不提供这样的灵活性。请问您这里用法是否有问题,或者是我对书本理解片面了?求指导!

大佬的线程池写的很赞,有一个问题是把线程池拿到Dll执行,VS2017下出问题了

事情是这样的
我把大佬的线程池拿出来工程化了,整成EXE执行程序,运行丝滑无事故。
整成DLL,单例获取线程池直接没LOG输出了。
UThreadPoolPtr tp = UThreadPoolSingleton::get();

目前的解决方式是USingleton.h中,将USingletonType type = USingletonType::HUNGRY 改成
USingletonType type = USingletonType::LAZY,或者将static放到get方法内部执行。

T00-HelloWorld执行时出现run failed...

我在x86 ubuntu 16.04上使用clang++编译得到二进制T00-HelloWorld,很大概率出现执行后出现:
./T00-HelloWorld
hello, world.
[CGraph] [2021-08-20 16:30:13.431] thread [140105641174784] run failed...

看起来是UThreadPrimary调用run()时成员pool_threads_存在空指针的情况。
pipeline->process()内部会调用deinit,从而释放了threadpool;之后threadpool的线程再调度回来的时候执行run发现threadpool中的UThreadPrimary指针已经为空了。

CGraph 是否能用在網路封包處理呢?

您好, 最近突發奇想, 并行计算框架常常用來建構一些AI計算的任務. 那是否能把這類框架用於 server 來快速處理封包呢?

我在想沒有人這樣做是不是因為調度的過程可能比處理封包更耗時間一類的,希望大佬能幫忙解惑一下XD

pub sub 方法的实现

提供了一种可以安全的转 unique ptr的思路

void test() {
    unique_ptr<father> var = make_unique<son>();
    unique_ptr<son> s ( dynamic_cast<son *>( var.release()) ) ;    // 可以直接获取的方式转了
    // var->print();
    s->print();
}

把pipeline定义为全局变量,destroy函数不能注销节点吗?

我的应用场景是把pipeline定义为全局变量,ui主界面每次点击【运行测试】按钮,每次都重新注册节点,然后运行。
源码如下:
#include "MyGNode/MyNode1.h"
#include "MyGNode/MyNode2.h"

using namespace CGraph;

GPipelinePtr pipeline = nullptr;

void init()
{
pipeline = GPipelineFactory::create();
}

void uninit()
{
GPipelineFactory::remove(pipeline);
}

void test()
{
GElementPtr a, b = nullptr;
CStatus status = pipeline->registerGElement(&a, {}, "nodeA");
status += pipeline->registerGElement(&b, {a}, "nodeB");
if (!status.isOK())
{
return;
}

#if 1
status += pipeline->init();
status += pipeline->run();
status += pipeline->destroy();
#else
status += pipeline->process(); //process函数,相当于init(),run()*n,destroy()函数,同时调用
#endif
}

int main()
{
init();
test();//运行测试1
test();//运行测试2
test();//运行测试3
uninit();
return 0;
}

来看打印输出的结果是:
[CGraph] [Sun Mar 5 22:08:35 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:35 2023] [nodeA], enter MyNode1 run function. Sleep for 1 second ...
[CGraph] [Sun Mar 5 22:08:36 2023] [nodeB], enter MyNode2 run function. Sleep for 2 second ...
[CGraph] [Sun Mar 5 22:08:38 2023] [nodeB], enter MyNode2 destroy function.
[CGraph] [Sun Mar 5 22:08:38 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:38 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:38 2023] [nodeA], enter MyNode1 run function. Sleep for 1 second ...
[CGraph] [Sun Mar 5 22:08:39 2023] [nodeB], enter MyNode2 run function. Sleep for 2 second ...
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeB], enter MyNode2 destroy function.
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeB], enter MyNode2 destroy function.
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeB], enter MyNode2 init function.
[CGraph] [Sun Mar 5 22:08:41 2023] [nodeA], enter MyNode1 run function. Sleep for 1 second ...
[CGraph] [Sun Mar 5 22:08:42 2023] [nodeB], enter MyNode2 run function. Sleep for 2 second ...
[CGraph] [Sun Mar 5 22:08:44 2023] [nodeB], enter MyNode2 destroy function.
[CGraph] [Sun Mar 5 22:08:44 2023] [nodeB], enter MyNode2 destroy function.
[CGraph] [Sun Mar 5 22:08:44 2023] [nodeB], enter MyNode2 destroy function.

看调试日志,每执行一次test(),init function和destroy function就会增多一次。
感觉不妙,pipeline->destroy()是不是没有把注册的节点自动删除掉?

对GElementManager::analyse中一些细节的困惑

在GElementManager.cpp Line148有如下逻辑

while (curElement->isLinkable()
                               && 1 == curElement->run_before_.size()
                               && (*curElement->run_before_.begin())->isLinkable()) {
                            curElement = (*curElement->run_before_.begin());
                            curCluster.addElement(curElement);
                            duplications.insert(curElement);
                        }

其中curElement->isLinkable()的逻辑是否有必要存在?
从上下文来看,当curElement是上一轮已经执行结束的cluster中节点的后继,同时又isRunnable时,理论上它并不应该是isLinkable的,因为如果它isLinkable,就意味着它可以被加入到上一轮相应cluster的尾部
具体而言,以如下case为例

    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr a0, a1, a2;
    GElementPtr b0, b1, b2;
    GElementPtr x0, x1, x2;

    pipeline->registerGElement<HelloWorldNode>(&a0, {}, "a0");   
    pipeline->registerGElement<HelloWorldNode>(&a1, {a0}, "a1"); 
    pipeline->registerGElement<HelloWorldNode>(&a2, {a1}, "a2"); 

    pipeline->registerGElement<HelloWorldNode>(&b0, {}, "b0");   
    pipeline->registerGElement<HelloWorldNode>(&b1, {b0}, "b1");
    pipeline->registerGElement<HelloWorldNode>(&b2, {b1}, "b2");

    pipeline->registerGElement<HelloWorldNode>(&x0, {a2, b2}, "x0");   
    pipeline->registerGElement<HelloWorldNode>(&x1, {x0}, "x1");
    pipeline->registerGElement<HelloWorldNode>(&x2, {x1}, "x2");

    pipeline->process();    // 运行pipeline
    GPipelineFactory::destroy(pipeline);

按照现有逻辑会生成
iter0:
a0 -> a1 -> a2
b0 -> b1 -> b2
iter1:
x0
iter2:
x1 -> x2
四组cluster,因为x0节点有两个前驱节点,因而并不满足isLinkable的条件
而如果移除curElement->isLinkable()的判断,会生成更自然的三组cluster
iter0:
a0 -> a1 -> a2
b0 -> b1 -> b2
iter1:
x0 -> x1 -> x2

对GPipeline::registerGElement接口中一些不恰当行为的困惑和建议

阅读了下源码,感觉GPipeline::registerGElement接口在处理外部创建node的情况下逻辑有些奇怪。
例如如下case1中,外部new的node存放于hw,但是会在GPipeline::registerGElement内部被修改,如果不事先备份一下的话,内存泄露就发生了。此时是否应该assert一下*elementRef == nullptr?

void case1() {
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr hw = new HelloWorldNode();
    GElementPtr backup = hw;
    printf("%p\n", hw);
    printf("%p\n", backup);
    pipeline->registerGElement<HelloWorldNode>(&hw);    // 注册一个helloworld节点
    printf("%p\n", hw);
    printf("%p\n", backup);
    pipeline->process();    // 运行pipeline
    GPipelineFactory::destroy(pipeline);
    backup->run();
}
0x7fffc6dca4e0
0x7fffc6dca4e0
0x7fffc6dca9c0
0x7fffc6dca4e0
hello, world.
hello, world.

而在case2中,GPipeline的instance在析构时会delete所有记录下来的node指针,即使这个node是外部创建的(一般来讲生命周期应该由外部调用者来管控),即使这个node不一定是new出来的。此时是否应该记录下node是否为internal对象,以确定要不要在析构的时候delete?

void case2() {
    GPipelinePtr pipeline = GPipelineFactory::create();
    GElementPtr hw = new HelloWorldNode();
    pipeline->registerGElement<HelloWorldNode>(&hw);    // 注册一个helloworld节点
    pipeline->process();    // 运行pipeline
    GPipelineFactory::destroy(pipeline);
    hw->run();
}
hello, world.
[1]    22131 segmentation fault (core dumped)  ./T00-HelloWorld

UAtomicQueue中尝试拷贝unique_ptr

// src/UtilsCtrl/ThreadPool/AtomicQueue/UAtomicQueue.h:line 84
std::unique_ptr<T> tryPop() {
        CGRAPH_LOCK_GUARD lk(mutex_);
        if (queue_.empty()) {
            return std::unique_ptr<T>();
        }
        std::unique_ptr<T> res = queue_.front();
        queue_.pop();
        return res;
    }

理论上应该是

std::unique_ptr<T> res = std::move(queue_.front());

不过project里似乎并没有使用过这个函数,因而发生没有模板实例化,所以没报错
使用如下case即能发现问题

#include "../src/UtilsCtrl/ThreadPool/AtomicQueue/UAtomicQueue.h"

void case1() {
    UAtomicQueue<int> a;
    a.push(1);
    auto v = a.tryPop();
}

请问可视化pipeline->dump()生成了文本后如何实现逆解?

如题,我们通过pipeline->dump()可以得到可视化需要的文本字符串。
https://dreampuf.github.io/GraphvizOnline/
例如,
digraph CGraph {
compound=true;
p000001E8A5B24CD0[label="nodeA"];
p000001E8A5B24CD0 -> p000001E8A5B28FD0;
p000001E8A5B24CD0 -> p000001E8A5B2A180;
p000001E8A5B28FD0[label="nodeB"];
p000001E8A5B28FD0 -> p000001E8A5B2A320;
p000001E8A5B2A180[label="nodeC"];
p000001E8A5B2A180 -> p000001E8A5B2A320;
p000001E8A5B2A320[label="nodeD"];
}
或者
digraph G {
start -> a0;
start -> b0;
a1 -> b3;
b2 -> a3;
a3 -> a0;
a3 -> end;
b3 -> end;
}

现在我的项目需求是,如何通过这个字符串来实现逆解?
所谓的逆解指的是:
1、先把digraph CGraph {...}这字符串本地持久化,保存为demo.txt文件
2、我的应用程序打开demo.txt文件,解析它,然后生成相应的数据结构
3、解析之后,根据得到的数据结构,来动态生成该图对应的Pipeline或者MultiPipeline
4、运行Pipeline或者MultiPipeline

请问如何实现这个逆解?有无例程?

Message这块有没有例子

有看到消息这块,能不能出个例子

类似执行某个事儿,在执行过程中外发消息,通过订阅消息,在解析消息内容的时候触发其他的执行

单线程跑region的时候卡住了

在ubuntu或Mac上跑pipeline时,当我修改下面的参数:

static const int CGRAPH_DEFAULT_THREAD_SIZE = 1;    // 默认主线程个数
static const int CGRAPH_MAX_THREAD_SIZE = 1;             // 最大线程个数
static const int CGRAPH_MAX_TASK_STEAL_RANGE = 0;                                           // 盗取机制相邻范围

pipeline执行到region的地方卡住了。如果默认主线程数和最大线程数设置为2时可以正常执行pipeline。

使用了CGraph后,openmp单核执行,没有加速

我在算子里面用了openmp并行,但无论我如何设置线程数,程序都并不会并行执行。当算子屏蔽掉CGraph之后,是可以正常并行执行的。下面附上了我的相关代码(openmp应该是没问题的?):

    CStatus train() override {

#pragma omp parallel for num_threads(GA_DEFAULT_THREAD_SIZE) schedule(dynamic) default(none)

        for (IDType i = 0; i < num_; i++) {
            model_->graph_n_[i].reserve(out_degree_);
            std::vector<IDType> neighbor_id(out_degree_);
            GenRandomID(neighbor_id.data(), num_, out_degree_);
            for (const IDType &id: neighbor_id) {
                if (id != cur_num_) {
                    DistResType dist = 0;
                    dist_op_.calculate(data_modal1_ + (id * dim1_),
                                       data_modal1_ + cur_num_ * dim1_,
                                       dim1_, dim1_,
                                       data_modal2_ + (id * dim2_),
                                       data_modal2_ + cur_num_ * dim2_,
                                       dim2_, dim2_, dist);
                    model_->graph_n_[i].emplace_back(id, dist);
                }
            }
        }
        return CStatus();
    }

有关线程池测试

不知道大佬有没有 关于线程池测试代码的实现可供借鉴,不太熟悉用了future什么的该怎么计算总耗时,以及多路并发往线程池投任务蛮好像也不太知道该怎么做

vs2015无法编译通过。

看了介绍感觉设计得挺好的,想用用看,但是vs2015环境编译不通过,感觉了版本低了?

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.