Coder Social home page Coder Social logo

Comments (21)

chenxushao avatar chenxushao commented on September 27, 2024

另外,部署的代码是同一套。

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

Dag和注册Action我这边均是由前端发起请求,动态注册的,假如是A机器收到请求,那么这个action肯定是在A机器了。
但是真正的执行Action根据文档描述,可能是由另一台worker机器B执行,这是不是出现了action not found的根因呢

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

诚盼回复。

from fastflow.

philhuan avatar philhuan commented on September 27, 2024

这里 action 的注册用法是在服务启动的时候注册,不是通过接口注册的。

from fastflow.

philhuan avatar philhuan commented on September 27, 2024

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

动态不支持吗,比如新增一个编排任务,这时候会新创建dag和新注册action,应该是比较合理的需求。

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

要注册得有一个实现了接口的结构体吧,通过接口注册怎么关联这个结构体,新增action 得改代码吧?

您好,是有两个固化的结构体,只是每次注册name不一样,因为处理的逻辑不一样。

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。
之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() {
fastFlowConf := configs.Get().FastFlow
connStr := fastFlowConf.ConnStr
dataBase := fastFlowConf.Database
prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问:
fastflow.RegisterAction
必须在
fastflow.start之前执行是吗?

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

第1点有一个疑问: fastflow.RegisterAction 必须在 fastflow.start之前执行是吗?

最好是,最晚必须在工作流实例被执行前,否则就会在action集合中找不到

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

通过阅读源码, fastflow.RegisterAction(actions) 只会在接收到请求的那台机器的内存中(是一个map)才会有数据。我想的是,之所以报action not found,是否是由于具体的执行被调度到另一台机器上了。 另外,还有一个问题,一个Dag上的N外节点,只会被同一台机器执行吧?因为一个dag节点还存在数据共享和传递。

你的理解是对的,同一个DagInstance只会被同一个worker执行

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?

目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。

在main语文教学中初始化fastflow的代码如下:

func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}

之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

action not found的唯一原因就是它没注册上,看你描述,可能导致这个现象有几个点:

  1. 你的注册逻辑可能晚于fastflow的初始化
  2. 注册的actionname不对,或者注册失败

你好。 fasttflow的初始化,只需要也只能够执行一次对吗?
目前fastflow的初始化,是在go main方法中进行的。 之后,通过controller(使用的gin框架)来触发提交的任务。
在main语文教学中初始化fastflow的代码如下:
func InitFlow() { fastFlowConf := configs.Get().FastFlow connStr := fastFlowConf.ConnStr dataBase := fastFlowConf.Database prefix := fastFlowConf.Prefix

workKeyNum := BuildWorkNum()
logrus.Infof("InitFlow. workKeyNum:%d", workKeyNum)

// 初始化选举组件
Keeper = mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
	Key: "worker-" + util.StringUtils.AsString(workKeyNum),
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init keeper begin. keeper:%v", Keeper)
if err := Keeper.Init(); err != nil {
	errMsg := fmt.Sprintf("init keeper failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}

logrus.Infof("init keeper finish. workKey:%s", Keeper.WorkerKey())

isLeader := Keeper.IsLeader()
if isLeader {
	notice.RobotNotice(fmt.Sprintf("keeper leader ip:%s,workKey:%s", runtimes.AppInfo.Ip, Keeper.WorkerKey()))
}

// 初始化存储组件
st := mongoStore.NewStore(&mongoStore.StoreOption{
	// if your mongo does not set user/pwd, you should remove it
	ConnStr:  connStr,
	Database: dataBase,
	Prefix:   prefix,
})

logrus.Infof("init store begin. st:%v", st)
if err := st.Init(); err != nil {
	errMsg := fmt.Sprintf("init store failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("init store finish")

// 启动fastflow
logrus.Info("start flow begin")
if err := fastflow.Start(&fastflow.InitialOption{
	Keeper:             Keeper,
	Store:              st,
	DagScheduleTimeout: 300 * time.Second,
	ExecutorTimeout:    150 * time.Second,
}); err != nil {
	errMsg := fmt.Sprintf("init fastflow failed: %s", err.Error())
	logrus.Error(errMsg)
	notice.RobotNotice(errMsg)
	return
}
logrus.Info("start flow finish")

}
之后,通过http请求来构建Dag并触发执行。代码形如下:

dagVars, dagVarMap := BuildDagVars(globalVarItems)
tasks, actions := BuildTagElement(rawDagBO.TaskNodes)

// 注册 action
fastflow.RegisterAction(actions)
logrus.Infof("RegisterAction finish.dagExplainId:%d", dagExplainId)

//构建dag模型
dag := initialize.CreateDag(util.StringUtils.AsString(dagId), rawDagBO.Name, rawDagBO.Descr, tasks, dagVars)

ch := make(chan string)
//创建dag并运行
go initialize.CreateDagAndInstance(dag, dagVarMap, ch)

这种使用方式是可行的吗?

这种方式可行吗?

目前这样,就是遇到 了action not found异常。 我部署了2台机器

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

我看你Resgiter是在接口里面,你有2台机器,可能在A机器接受的请求,注册Action,但是被分发到了B机器,导致这个错误。
为什么不在初始化的时候注册Action呢

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

因为我们目前的Dag是由业务方构建然后提交的,如果只能在初始化构建,那这个编排任务是否只能在系统启动时才能初始化呢?

from fastflow.

chenxushao avatar chenxushao commented on September 27, 2024

我们目前是2类固化的Action,一类是查询类,一类是执行动作的变更,我目前是固化了2个Action实现。然后由前端动态构建Dag,构建好后,regionAction(因为action的name每次不一样),之后创建dag并持久化到mongodb,然后执行CreateDagAndInstance。

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

from fastflow.

gaohaotian010 avatar gaohaotian010 commented on September 27, 2024

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

from fastflow.

ShiningRush avatar ShiningRush commented on September 27, 2024

Dag的执行是可以被任意节点提交的, 但是Action 包含逻辑代码, 所以这部分内容只能在编译时提交

也就是说,如果像 @chenxushao 这种用法,其实需要预定义所有的“能力”到Action里,然后前端构建DAG的时候只能通过填写“能力”的参数构建DAG的形式,可以这样理解吗?

是的,你理解的没问题。
如果需要支持动态插入Action,需要将Action用Plugin来实现,目前没有强需求

from fastflow.

Related Issues (20)

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.