Comments (21)
另外,部署的代码是同一套。
from fastflow.
Dag和注册Action我这边均是由前端发起请求,动态注册的,假如是A机器收到请求,那么这个action肯定是在A机器了。
但是真正的执行Action根据文档描述,可能是由另一台worker机器B执行,这是不是出现了action not found的根因呢
from fastflow.
诚盼回复。
from fastflow.
这里 action 的注册用法是在服务启动的时候注册,不是通过接口注册的。
from fastflow.
要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?
from fastflow.
动态不支持吗,比如新增一个编排任务,这时候会新创建dag和新注册action,应该是比较合理的需求。
from fastflow.
要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?
您好,是有两个固化的结构体,只是每次注册name不一样,因为处理的逻辑不一样。
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
你好。 fasttflow的初始化,只需要也只能够执行一次对吗?
目前fastflow的初始化,是在go main方法中进行的。
之后,通过controller(使用的gin框架)来触发提交的任务。
在main语文教学中初始化fastflow的代码如下:
func InitFlow() {
fastFlowConf := configs.Get().FastFlow
connStr := fastFlowConf.ConnStr
dataBase := fastFlowConf.Database
prefix := fastFlowConf.Prefix
workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)
// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-" + util.StringUtils.AsString(workKeyNum),
// if your mongo does not set user/pwd, you should remove it
ConnStr: connStr,
Database: dataBase,
Prefix: prefix,
})
logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
logrus.Error(errMsg)
notice.RobotNotice(errMsg)
return
}
logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())
isLeader := Keeper.IsLeader()
if isLeader {
notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}
// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
// if your mongo does not set user/pwd, you should remove it
ConnStr: connStr,
Database: dataBase,
Prefix: prefix,
})
logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
errMsg := fmt.Sprintf("init store failed: %s", err.Error())
logrus.Error(errMsg)
notice.RobotNotice(errMsg)
return
}
logrus.Info("init store finish")
// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
Keeper: Keeper,
Store: st,
DagScheduleTimeout: 300 * time.Second,
ExecutorTimeout: 150 * time.Second,
}); err != nil {
errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
logrus.Error(errMsg)
notice.RobotNotice(errMsg)
return
}
logrus.Info("start flow finish")
}
之后,通过http请求来构建Dag并触发执行。代码形如下:
dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)
// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)
//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)
ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)
这种使用方式是可行的吗?
from fastflow.
通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
第1点有一个疑问:
fastflow.RegisterAction
必须在
fastflow.start之前执行是吗?
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
第1点有一个疑问: fastflow.RegisterAction 必须在 fastflow.start之前执行是吗?
最好是,最晚必须在工作流实例被执行前,否则就会在action集合中找不到
from fastflow.
通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。
你的理解是对的,同一个DagInstance只会被同一个worker执行
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
你好。 fasttflow的初始化,只需要也只能够执行一次对吗?
目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。
在main语文教学中初始化fastflow的代码如下:
func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix
workKeyNum := BuildWorkNum() logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum) // 初始化选举组件 Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ Key: "worker-" + util.StringUtils.AsString(workKeyNum), // if your mongo does not set user/pwd, you should remove it ConnStr: connStr, Database: dataBase, Prefix: prefix, }) logrus.Infof("init keeper begin. keeper:%v", Keeper) if err := Keeper.Init(); err != nil { errMsg := fmt.Sprintf("init keeper failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey()) isLeader := Keeper.IsLeader() if isLeader { notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey())) } // 初始化存储组件 st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, you should remove it ConnStr: connStr, Database: dataBase, Prefix: prefix, }) logrus.Infof("init store begin. st:%v", st) if err := st.Init(); err != nil { errMsg := fmt.Sprintf("init store failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Info("init store finish") // 启动fastflow logrus.Info("start flow begin") if err := fastflow.Start(&fastflow.InitialOption{ Keeper: Keeper, Store: st, DagScheduleTimeout: 300 * time.Second, ExecutorTimeout: 150 * time.Second, }); err != nil { errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Info("start flow finish")
}
之后,通过http请求来构建Dag并触发执行。代码形如下:
dagVars, dagVarMap := BuildDagVars(globalVarItems) tasks, actions := BuildTagElement(rawDagBO.TaskNodes) // 注册 action fastflow.RegisterAction(actions) logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId) //构建dag模型 dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars) ch := make(chan string) //创建dag并运行 go initialize.CreateDagAndInstance(dag, dagVarMap, ch)
这种使用方式是可行的吗?
这种方式可行吗?
from fastflow.
action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:
- 你的注册逻辑可能晚于fastflow的初始化
- 注册的actionname不对,或者注册失败
你好。 fasttflow的初始化,只需要也只能够执行一次对吗?
目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。
在main语文教学中初始化fastflow的代码如下:
func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.PrefixworkKeyNum := BuildWorkNum() logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum) // 初始化选举组件 Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{ Key: "worker-" + util.StringUtils.AsString(workKeyNum), // if your mongo does not set user/pwd, you should remove it ConnStr: connStr, Database: dataBase, Prefix: prefix, }) logrus.Infof("init keeper begin. keeper:%v", Keeper) if err := Keeper.Init(); err != nil { errMsg := fmt.Sprintf("init keeper failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey()) isLeader := Keeper.IsLeader() if isLeader { notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey())) } // 初始化存储组件 st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, you should remove it ConnStr: connStr, Database: dataBase, Prefix: prefix, }) logrus.Infof("init store begin. st:%v", st) if err := st.Init(); err != nil { errMsg := fmt.Sprintf("init store failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Info("init store finish") // 启动fastflow logrus.Info("start flow begin") if err := fastflow.Start(&fastflow.InitialOption{ Keeper: Keeper, Store: st, DagScheduleTimeout: 300 * time.Second, ExecutorTimeout: 150 * time.Second, }); err != nil { errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error()) logrus.Error(errMsg) notice.RobotNotice(errMsg) return } logrus.Info("start flow finish")
}
之后,通过http请求来构建Dag并触发执行。代码形如下:dagVars, dagVarMap := BuildDagVars(globalVarItems) tasks, actions := BuildTagElement(rawDagBO.TaskNodes) // 注册 action fastflow.RegisterAction(actions) logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId) //构建dag模型 dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars) ch := make(chan string) //创建dag并运行 go initialize.CreateDagAndInstance(dag, dagVarMap, ch)
这种使用方式是可行的吗?
这种方式可行吗?
目前这样,就是遇到 了action not found异常。 我部署了2台机器
from fastflow.
我看你Resgiter是在接口里面,你有2台机器,可能在A机器接受的请求,注册Action,但是被分发到了B机器,导致这个错误。
为什么不在初始化的时候注册Action呢
from fastflow.
因为我们目前的Dag是由业务方构建然后提交的,如果只能在初始化构建,那这个编排任务是否只能在系统启动时才能初始化呢?
from fastflow.
我们目前是2类固化的Action,一类是查询类,一类是执行动作的变更,我目前是固化了2个Action实现。然后由前端动态构建Dag,构建好后,regionAction(因为action的name每次不一样),之后创建dag并持久化到mongodb,然后执行CreateDagAndInstance。
from fastflow.
Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交
from fastflow.
Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交
也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?
from fastflow.
Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交
也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?
是的,你理解的没问题。
如果需要支持动态插入Action,需要将Action用Plugin来实现,目前没有强需求
from fastflow.
Related Issues (20)
- 如何跳过一些路线的任务 HOT 6
- 能出一个用mysql代替mongo,或者直接关闭存储层的功能么 HOT 1
- 架构图都挂了, 能重新补上吗? HOT 1
- 发现个问题,项目启动的时候register了结构体,并调用fastflow.init,然后在接口中构造dag,调用run方法,但我发现会出现actionName运行2次的情况,或者多次运行之后出现2023/10/09 22:52:18 error: parser get some error%!(EXTRA string=module, string=parser, string=err, *fmt.wrapError=worker do failed: dag instance[482065678661058561] does not found task tree) HOT 4
- [Feature] 可以提供UI吗 HOT 4
- 请问一个task如何进行重试? HOT 1
- 请问Task的父亲依赖节点DependOn的最大数量是多少? HOT 5
- 多机执行时,报action not found: HOT 8
- 创建task有非常大的延迟 HOT 2
- watchDagInsCmd线程处理continue cmd异常导致task在Precheck时block状态无法放开 HOT 4
- 根据示例使用ShareData对象的Set()方法未将数据持久化 HOT 2
- DAG实例启动后,如何block DAG实例从而阻塞未执行的task HOT 3
- 支持if条件分支判断吗 HOT 1
- continue指令失效,咨询下发continue指令到task进入running状态的过程 HOT 4
- 如何使处理任务的worker实例和执行RunDag的实例相同,即本机执行任务 HOT 5
- 能支持流式执行任务吗 HOT 3
- 在RunBefore中如果有错误,则任务阻塞 HOT 3
- 请问如何将Task分发到指定节点呢? HOT 8
- 请问如何在不重启fastflow的情况下动态的增加dag HOT 1
Recommend Projects
-
React
A declarative, efficient, and flexible JavaScript library for building user interfaces.
-
Vue.js
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
-
Typescript
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
-
TensorFlow
An Open Source Machine Learning Framework for Everyone
-
Django
The Web framework for perfectionists with deadlines.
-
Laravel
A PHP framework for web artisans
-
D3
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
-
Recommend Topics
-
javascript
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
-
web
Some thing interesting about web. New door for the world.
-
server
A server is a program made to process requests and deliver data to clients.
-
Machine learning
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
-
Visualization
Some thing interesting about visualization, use data art
-
Game
Some thing interesting about game, make everyone happy.
Recommend Org
-
Facebook
We are working to build community through open source technology. NB: members must have two-factor auth.
-
Microsoft
Open source projects and samples from Microsoft.
-
Google
Google ❤️ Open Source for everyone.
-
Alibaba
Alibaba Open Source for everyone
-
D3
Data-Driven Documents codes.
-
Tencent
China tencent open source team.
from fastflow.