shiningrush / fastflow Goto Github PK
View Code? Open in Web Editor NEWA lightweight, high-performance distributed workflow framework
License: MIT License
A lightweight, high-performance distributed workflow framework
License: MIT License
如题,一个 DAG 内的任务,支持任务支持取消与回滚吗?
例如 DAG 执行到某个中间步骤,此时前端调用取消任务的 API,是否支持将正在执行中(Running)的任务取消,并回滚已经执行完成的(success)任务?
情况是这样的,fastflow部署了一共2台机器。运行的时候,经常报action not found
报这个错误的代码在这里:
execturor.go
func (e *DefExecutor) runAction(taskIns *entity.TaskInstance) error {
act := ActionMap[taskIns.ActionName]
if act == nil {
return fmt.Errorf("action not found: %s", taskIns.ActionName)
}
再翻看源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}
defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)
Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在‹内存›中,并没有持久化。
在程序中,我是这么注册Action 的。
// 注册 action
fastflow.RegisterAction(actions)
我看到官方文档上是这样描述:
当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。
假设fastflow.RegisterAction(actions)在2台机器中的A执行,那么是不有可能由另一台机器去执行,是不是因为这个造成的报action not found呢?
例如,创建一个 Vm,等待 vm 创建完成在这里怎么实现?
PreCheck支持skip字段,但是似乎是在task running之前进行检查后跳过。如果task已经running有办法可以skip吗?使用CancelTask能否将运行中的task取消以达到上述效果吗?
RT,测试了某个dag中某个Task的DependOn的长度为10000,即前向依赖于10000个任务节点,每个任务节点的Action是简单的print,但是执行到该节点(timeout设置为3600)时就卡住了,该task的状态一直维持为init。
可以像goflow或者airflow一样提供简单的UI吗
Let's say if I want to have three workers (included leader), how to run these on a server? Could you give an example? Thanks.
我试图在RunBefore函数中获取TaskInstance相关参数,但返回结果taskIns1为空指针,然后程序就会卡在fmt.Println("taskIns1.2", taskIns1.TaskID),没有显示的报错,附完整代码
package main
import (
"fmt"
"log"
"time"
"github.com/shiningrush/fastflow"
mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
"github.com/shiningrush/fastflow/pkg/entity"
"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/shiningrush/fastflow/pkg/mod"
mongoStore "github.com/shiningrush/fastflow/store/mongo"
)
type PrintAction struct {
}
// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
return "PrintAction"
}
func (a *PrintAction) RunBefore(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("-------- Run action before")
ctx1 := ctx.Context()
taskIns1, _ := ctx1.Value("running-task").(*entity.TaskInstance)
fmt.Println("taskIns1.1", taskIns1)
fmt.Println("taskIns1.2", taskIns1.TaskID)
return nil
}
func (a *PrintAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("Run action after")
return nil
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("Run action start: ", time.Now())
taskIns, _ := entity.CtxRunningTaskIns(ctx.Context())
fmt.Println(taskIns.TaskID)
return nil
}
func main() {
// Register action
fastflow.RegisterAction([]run.Action{
&PrintAction{},
})
// init keeper, it used to e
keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-1",
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := keeper.Init(); err != nil {
log.Fatal(fmt.Errorf("init keeper failed: %w", err))
}
// init store
st := mongoStore.NewStore(&mongoStore.StoreOption{
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := st.Init(); err != nil {
log.Fatal(fmt.Errorf("init store failed: %w", err))
}
go createDagAndInstance()
// start fastflow
if err := fastflow.Start(&fastflow.InitialOption{
Keeper: keeper,
Store: st,
// use yaml to define dag
// 所有的yaml文件都会被执行
ReadDagFromDir: "./",
}); err != nil {
panic(fmt.Sprintf("init fastflow failed: %s", err))
}
}
func createDagAndInstance() {
// wait fast start completed
time.Sleep(time.Second)
// run some dag instance
// for i := 0; i < 10; i++ {
_, err := mod.GetCommander().RunDag("test-dag", nil)
if err != nil {
log.Fatal(err)
}
// time.Sleep(time.Second * 10)
// }
}
id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
tasks:
- id: "task1"
actionName: "PrintAction"
preCheck:
isIgnoreFiles:
act: skip #you can set "skip" or "block"
conditions:
- source: vars # source could be "vars" or "share-data"
key: "fileName"
op: "in"
values: ["warn.txt", "error.txt", "file1.txt"]
- id: "task2"
actionName: "PrintAction"
dependOn: ["task1"]
- id: "task3"
actionName: "PrintAction"
dependOn: ["task1"]
`if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
return nil, err
}
dagIns, err := mod.GetStore().GetDagInstance(dagInstance.ID)
if err != nil {
return nil, err
}
dagIns.ShareData.Set("dagInstanceID", dagInstance.ID)
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
return nil, err
}
dagIns.ShareData.Set("jobID", strconv.Itoa(jobID))
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
return nil, err
}`
在执行上述create的操作后,我期待在后面通过一个for循环同步tasks的状态,理论上我最终通过listTaskInstance获取到的task数量应该和dag中声明的task数量相同,但这里存在非常大的延迟,for循环等待时间可能要数十秒或一致卡死,显示我目前db中不存在和当前dagInstance相关联的task,当我查看db时也是如此,dagInstance存在,而tasks却不存在,我想知道这是什么问题呢?有没有相关的排查思路?
for { time.Sleep(500 * time.Millisecond) tasks, err = mod.GetStore().ListTaskInstance(&mod.ListTaskInstanceInput{ DagInsID: dagInstance.ID, }) if err != nil { fmt.Printf("total_task_length: %d, taskcount: %d\n, err: %s", taskCount, len(tasks), err.Error()) continue } else if len(tasks) != taskCount { fmt.Printf("total_task_length: %d, taskcount: %d\n", taskCount, len(tasks)) continue } break }
而且在运行过程中出现过非常诡异的现象,dagInstance理论上会从init状态变为scheduled,但是流程中在没有通过外部调用的情况下,dagInstance又从scheduled变回了init状态,请问这是为什么呢?通过mongo连续两次查询相同的dagInstance状态可以观察此现象:
你好,多机时,会报action not found:。
翻看了源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}
defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)
Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在本机内存中。
在程序中,我是这么注册Action 的。
// 注册 action
fastflow.RegisterAction(actions)
那么,如果是多机执行,这个 Dag 被调度到另外一台机器时,另外一台机器的内在中(邓全局的ActionMap)显然就没有相关的 Dag 数据。
目前,在多机情况下,能够稳定复现action not found:。不知道是不是我自己还有别的点没有考虑到。
诚盼解惑。
使用的是项目启动的时候注册的结构体作为action,没有在每次调用的时候重复注册
在 Run(ctx run.ExecuteContext, params interface{}) 的 ctx 中可以取到当前任务相关信息吗
dagId
dagInsId
taskId : 节点ID
或者如何能获取到呢
看到示例中的是这样写的
id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
tasks:
- id: "task1"
actionName: "PrintAction"
preCheck:
- act: skip #you can set "skip" or "block"
conditions:
- source: vars # source could be "vars" or "share-data"
key: "fileName"
op: "in"
values: ["warn.txt", "error.txt"]
preCheck应该是一个列表,但实际运行中发现,报错,查看代码得知,preCheck定义是一个PreChecks map[string]*Check
对象,修改成
preCheck:
act1:
act: block
conditions:
- source: "vars"
key: "fileName"
op: "in"
values: ["file.txt", "error.txt"]
如果是这样的话,不知道原意设计是有多个preCheck要处理,还是只能处理一个act的preCheck,然后再conditions里面设置多个条件判断呢?
按照我的理解,preCheck的定义应该是一个[]Check,能够根据多个条件来判断该节点是应该阻塞还是应该跳过
手动去控制流程的通过、拒绝
1、例如一个长任务,executor跑的中途重启了,起来后支持自动继续执行吗
可扩展性:fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)
这是readme中描述的,但是看后面也没有介绍内置的http请求应该如何使用
假设有两个副本,这两个副本都有概率接收到创建dag实例(RunDag)的请求, 希望创建实例后在本地执行、处理请求,请问如何配置?
如 A副本创建了一个dag_ins, 这个实例流程任务都在A副本上运行。
辛苦帮忙看看,目前创建实例的api执行后,会在本地初始化一个管道,用于和流程中的init任务交互,实际执行节点和api节点不一致的情况下会导致通信失败~
假设当前存在某种定制的跳过场景,外部想控制某个task进行跳过
设置sharedata中要跳过的taskinstance的skip字段为true,理论上在action的runafter方法体内轮询这个字段即可。但我应该如何在逻辑中确认当前正在运行的action属于具体的哪一个taskinstance呢?似乎并没有找到通过run.ExecuteContext获取当前运行时taskinstance信息的接口啊
Hi, first of all thank you for the library!
In the process of trying to use it, we found that because the worker's unique id is achieved through sonyflake, the worker requires an integer between 0 and 255 to achieve the unique id when it is started
Line 14 in 1afa592
In practice, this restriction would lead to reliance on external storage to generate the unique integer in multi-process situations, so I propose that the global unique key can be passed directly from the caller and the keeper component no longer generates the unique key based on number
There are two advantages to this change
If you agree to this change, I will provide PR in my free time
thanks
如题.
我看到DAG中有一个cron字段,但源码里并未具体实现关于定时任务的相关处理,这部分是在外部处理吗?还是在内部处理,但是开源版本并未实现该功能,如果我想要自己在内部实现的话,能提供一个大概的思路吗?
我的思路是这样:在派发任务的Dispatcher组件中,去判断是否带有cron参数,如果带有cron参数,就启用goroutine并使用cron库来定时去派发任务到节点上去执行。
我是刚学了go语言,想做一个小的demo来熟练go在项目上的应用,对于上面的思路可能有思考的欠缺,如果有可能,希望能够得到你的指点
in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.
i want
在平常写shell脚本过程中,管道通常是读取一行,然后管道符后面的程序就开始用这一行作为输入开始执行了
看了下readme,好像不支持这种流式执行的任务?
https://github.com/ShiningRush/fastflow/blob/master/pkg/entity/task.go#L295
这里如果是重试,状态转为init后,直接退出了,然后等待parser继续遍历并且重新提交,这里为什么不直接在 Executor 里面继续运行呢,除了要重新渲染一遍参数,还有其他原因吗
比如一个任务是要提工单让人审批,审批完后workflow继续。
当任务执行的时候,每个run函数获取的ctx是一个新的context,没有包含上个节点赋值的变量
我想通过读取yaml的方式动态的往数据库中写入dag,有没有相关接口。或者有没有其他方式实现
尝试获取Dag里task的状态,
dag, err := mod.GetStore().GetDag("test-dag")
if err != nil {
panic(err)
}
for _, task := range dag.Tasks{
println("============== ", task.ActionName, task.GetStatus())
}
但是看实现里 task.GetStatus()返回的是空字符串
// GetStatus
func (t *Task) GetStatus() TaskInstanceStatus {
return ""
}
看源码里有TaskInstance可以获取状态
// GetStatus
func (t *TaskInstance) GetStatus() TaskInstanceStatus {
return t.Status
}
这个TaskInstance怎么能获取到进而再获取其执行状态?
代码里看到有相关的包,但是貌似还没有相关事件触发,是还未实装么。
const (
KeyDagInstanceUpdated = "DagInstanceUpdated"
KeyDagInstancePatched = "DagInstancePatched"
KeyTaskCompleted = "TaskCompleted"
KeyTaskBegin = "TaskBegin"
KeyLeaderChanged = "LeaderChanged"
KeyDispatchInitDagInsCompleted = "DispatchInitDagInsCompleted"
KeyParseScheduleDagInsCompleted = "ParseScheduleDagInsCompleted"
)
A declarative, efficient, and flexible JavaScript library for building user interfaces.
🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.
TypeScript is a superset of JavaScript that compiles to clean JavaScript output.
An Open Source Machine Learning Framework for Everyone
The Web framework for perfectionists with deadlines.
A PHP framework for web artisans
Bring data to life with SVG, Canvas and HTML. 📊📈🎉
JavaScript (JS) is a lightweight interpreted programming language with first-class functions.
Some thing interesting about web. New door for the world.
A server is a program made to process requests and deliver data to clients.
Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.
Some thing interesting about visualization, use data art
Some thing interesting about game, make everyone happy.
We are working to build community through open source technology. NB: members must have two-factor auth.
Open source projects and samples from Microsoft.
Google ❤️ Open Source for everyone.
Alibaba Open Source for everyone
Data-Driven Documents codes.
China tencent open source team.