Coder Social home page Coder Social logo

shiningrush / fastflow Goto Github PK

View Code? Open in Web Editor NEW
286.0 286.0 70.0 138 KB

A lightweight, high-performance distributed workflow framework

License: MIT License

Go 99.49% JavaScript 0.25% Makefile 0.26%
framework goalng goroutine workflow workflow-engine

fastflow's People

Contributors

philhuan avatar shiningrush avatar

Stargazers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

Watchers

 avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar  avatar

fastflow's Issues

怎么获取DAG里的每个task的执行状态?

尝试获取Dag里task的状态,

dag, err := mod.GetStore().GetDag("test-dag")

if err != nil {
	panic(err)
}

for _, task := range dag.Tasks{
	println("============== ", task.ActionName, task.GetStatus())
}

但是看实现里 task.GetStatus()返回的是空字符串

  // GetStatus
  func (t *Task) GetStatus() TaskInstanceStatus {
      return ""
  }

看源码里有TaskInstance可以获取状态

  // GetStatus
  func (t *TaskInstance) GetStatus() TaskInstanceStatus {
      return t.Status
  }

这个TaskInstance怎么能获取到进而再获取其执行状态?

DAG实例启动后,如何block DAG实例从而阻塞未执行的task

image
有这样一种场景,DAG实例运行过程中发现不符合预期或线上有case需要暂停流程、封禁动作的执行,目前来看貌似不支持此类命令,希望有能够同时阻塞所有dag 实例的运行的命令和阻塞单个实例运行的命令,通过continue或者放开全局锁来解除封禁操作~
辛苦看下当前是否有能够支持该场景的函数/建议如何自行实现~

RFC: support pass unique key to Keeper

Hi, first of all thank you for the library!

In the process of trying to use it, we found that because the worker's unique id is achieved through sonyflake, the worker requires an integer between 0 and 255 to achieve the unique id when it is started

func CheckWorkerKey(key string) (int, error) {

In practice, this restriction would lead to reliance on external storage to generate the unique integer in multi-process situations, so I propose that the global unique key can be passed directly from the caller and the keeper component no longer generates the unique key based on number

There are two advantages to this change

  • the logic of the keeper component is lighter, no need to generate its own unique keys, no need to introduce sonyflake
  • the limit of less than 255 workers is lifted

If you agree to this change, I will provide PR in my free time

thanks

发现个问题,项目启动的时候register了结构体,并调用fastflow.init,然后在接口中构造dag,调用run方法,但我发现会出现actionName运行2次的情况,或者多次运行之后出现2023/10/09 22:52:18 error: parser get some error%!(EXTRA string=module, string=parser, string=err, *fmt.wrapError=worker do failed: dag instance[482065678661058561] does not found task tree)

使用的是项目启动的时候注册的结构体作为action,没有在每次调用的时候重复注册

如何跳过一些路线的任务

如下图,我想在执行任务B时,如果条件命中了,跳过任务B,此时我希望任务E也能够跳过,这时候任务的线路是A->C->D,有点类似if-else,这种能够实现吗。

image

About Leader and Workers

Let's say if I want to have three workers (included leader), how to run these on a server? Could you give an example? Thanks.

Event 可以使用了吗

代码里看到有相关的包,但是貌似还没有相关事件触发,是还未实装么。

const (
	KeyDagInstanceUpdated = "DagInstanceUpdated"
	KeyDagInstancePatched = "DagInstancePatched"

	KeyTaskCompleted = "TaskCompleted"
	KeyTaskBegin     = "TaskBegin"

	KeyLeaderChanged                = "LeaderChanged"
	KeyDispatchInitDagInsCompleted  = "DispatchInitDagInsCompleted"
	KeyParseScheduleDagInsCompleted = "ParseScheduleDagInsCompleted"
)

is fastflow support rollback?

in my case, i want to use goflow to create some resources on a public cloud. like creating a cloud vm with public ip.

  1. create a vm using cloud openapi
  2. wait the status of vm is running
  3. create a public ip using cloud openapi
  4. wait the status of public ip is available
  5. bind the public ip with vm
  6. other steps

i want

  1. if I cancel the workflow(like run flow.cancel()), workflow can free the created resources(vm,public ip etc)
  2. if one step failed, workflow can free the created resources(vm,public ip etc)

在ctx中可以取到当前任务信息吗

在 Run(ctx run.ExecuteContext, params interface{}) 的 ctx 中可以取到当前任务相关信息吗

dagId
dagInsId
taskId : 节点ID

或者如何能获取到呢

对于DAG中的cron字段

我看到DAG中有一个cron字段,但源码里并未具体实现关于定时任务的相关处理,这部分是在外部处理吗?还是在内部处理,但是开源版本并未实现该功能,如果我想要自己在内部实现的话,能提供一个大概的思路吗?

我的思路是这样:在派发任务的Dispatcher组件中,去判断是否带有cron参数,如果带有cron参数,就启用goroutine并使用cron库来定时去派发任务到节点上去执行。

我是刚学了go语言,想做一个小的demo来熟练go在项目上的应用,对于上面的思路可能有思考的欠缺,如果有可能,希望能够得到你的指点

再次咨询,action not found问题

情况是这样的,fastflow部署了一共2台机器。运行的时候,经常报action not found
报这个错误的代码在这里:
execturor.go
func (e *DefExecutor) runAction(taskIns *entity.TaskInstance) error {
act := ActionMap[taskIns.ActionName]
if act == nil {
return fmt.Errorf("action not found: %s", taskIns.ActionName)
}

再翻看源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}

defExc Executor
defStore Store
defKeeper Keeper
defParser Parser
defCommander Commander
)

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在‹内存›中,并没有持久化。
在程序中,我是这么注册Action 的。

// 注册 action
fastflow.RegisterAction(actions)

我看到官方文档上是这样描述:
当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

假设fastflow.RegisterAction(actions)在2台机器中的A执行,那么是不有可能由另一台机器去执行,是不是因为这个造成的报action not found呢?

创建task有非常大的延迟

`if err := mod.GetStore().CreateDagIns(dagInstance); err != nil {
return nil, err
}

dagIns, err := mod.GetStore().GetDagInstance(dagInstance.ID)
if err != nil {
	return nil, err
}

dagIns.ShareData.Set("dagInstanceID", dagInstance.ID)
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
	return nil, err
}
dagIns.ShareData.Set("jobID", strconv.Itoa(jobID))
if err := mod.GetStore().UpdateDagIns(dagIns); err != nil {
	return nil, err
}`

在执行上述create的操作后,我期待在后面通过一个for循环同步tasks的状态,理论上我最终通过listTaskInstance获取到的task数量应该和dag中声明的task数量相同,但这里存在非常大的延迟,for循环等待时间可能要数十秒或一致卡死,显示我目前db中不存在和当前dagInstance相关联的task,当我查看db时也是如此,dagInstance存在,而tasks却不存在,我想知道这是什么问题呢?有没有相关的排查思路?
for { time.Sleep(500 * time.Millisecond) tasks, err = mod.GetStore().ListTaskInstance(&mod.ListTaskInstanceInput{ DagInsID: dagInstance.ID, }) if err != nil { fmt.Printf("total_task_length: %d, taskcount: %d\n, err: %s", taskCount, len(tasks), err.Error()) continue } else if len(tasks) != taskCount { fmt.Printf("total_task_length: %d, taskcount: %d\n", taskCount, len(tasks)) continue } break }

而且在运行过程中出现过非常诡异的现象,dagInstance理论上会从init状态变为scheduled,但是流程中在没有通过外部调用的情况下,dagInstance又从scheduled变回了init状态,请问这是为什么呢?通过mongo连续两次查询相同的dagInstance状态可以观察此现象:
image

能支持流式执行任务吗

在平常写shell脚本过程中,管道通常是读取一行,然后管道符后面的程序就开始用这一行作为输入开始执行了

看了下readme,好像不支持这种流式执行的任务?

内置的http请求操作怎么使用

可扩展性:fastflow 准备了部分开箱即用的任务操作,比如 http请求、执行脚本等,同时你也可以自行定义新的节点动作,同时你可以根据上下文来决定是否跳过节点(skip)

这是readme中描述的,但是看后面也没有介绍内置的http请求应该如何使用

question: DAG 任务支持取消与回滚吗?

如题,一个 DAG 内的任务,支持任务支持取消与回滚吗?
例如 DAG 执行到某个中间步骤,此时前端调用取消任务的 API,是否支持将正在执行中(Running)的任务取消,并回滚已经执行完成的(success)任务?

对于示例中的preCheck

看到示例中的是这样写的

id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
  - act: skip #you can set "skip" or "block"
    conditions:
    - source: vars # source could be "vars" or "share-data"
      key: "fileName"
      op: "in"
      values: ["warn.txt", "error.txt"]

preCheck应该是一个列表,但实际运行中发现,报错,查看代码得知,preCheck定义是一个PreChecks map[string]*Check对象,修改成

    preCheck:
      act1:
        act: block
        conditions:
          - source: "vars"
            key: "fileName"
            op: "in"
            values: ["file.txt", "error.txt"]

如果是这样的话,不知道原意设计是有多个preCheck要处理,还是只能处理一个act的preCheck,然后再conditions里面设置多个条件判断呢?

按照我的理解,preCheck的定义应该是一个[]Check,能够根据多个条件来判断该节点是应该阻塞还是应该跳过

如何使处理任务的worker实例和执行RunDag的实例相同,即本机执行任务

假设有两个副本,这两个副本都有概率接收到创建dag实例(RunDag)的请求, 希望创建实例后在本地执行、处理请求,请问如何配置?
如 A副本创建了一个dag_ins, 这个实例流程任务都在A副本上运行。

辛苦帮忙看看,目前创建实例的api执行后,会在本地初始化一个管道,用于和流程中的init任务交互,实际执行节点和api节点不一致的情况下会导致通信失败~

多机执行时,报action not found:

你好,多机时,会报action not found:。
翻看了源代码,ActionMap 是定义的一个全局变量。
var (
ActionMap = map[string]run.Action{}

defExc       Executor
defStore     Store
defKeeper    Keeper
defParser    Parser
defCommander Commander

)

Action 的注册,是将 Action 放到这个 Map 中,相当于数据是存储在本机内存中。
在程序中,我是这么注册Action 的。

// 注册 action
fastflow.RegisterAction(actions)

那么,如果是多机执行,这个 Dag 被调度到另外一台机器时,另外一台机器的内在中(邓全局的ActionMap)显然就没有相关的 Dag 数据。
目前,在多机情况下,能够稳定复现action not found:。不知道是不是我自己还有别的点没有考虑到。

诚盼解惑。

如何在Action的Run方法体内获取当前的Task Instance ID的元信息呢?

假设当前存在某种定制的跳过场景,外部想控制某个task进行跳过

设置sharedata中要跳过的taskinstance的skip字段为true,理论上在action的runafter方法体内轮询这个字段即可。但我应该如何在逻辑中确认当前正在运行的action属于具体的哪一个taskinstance呢?似乎并没有找到通过run.ExecuteContext获取当前运行时taskinstance信息的接口啊

请问Task的父亲依赖节点DependOn的最大数量是多少?

RT,测试了某个dag中某个Task的DependOn的长度为10000,即前向依赖于10000个任务节点,每个任务节点的Action是简单的print,但是执行到该节点(timeout设置为3600)时就卡住了,该task的状态一直维持为init。

Recommend Projects

  • React photo React

    A declarative, efficient, and flexible JavaScript library for building user interfaces.

  • Vue.js photo Vue.js

    🖖 Vue.js is a progressive, incrementally-adoptable JavaScript framework for building UI on the web.

  • Typescript photo Typescript

    TypeScript is a superset of JavaScript that compiles to clean JavaScript output.

  • TensorFlow photo TensorFlow

    An Open Source Machine Learning Framework for Everyone

  • Django photo Django

    The Web framework for perfectionists with deadlines.

  • D3 photo D3

    Bring data to life with SVG, Canvas and HTML. 📊📈🎉

Recommend Topics

  • javascript

    JavaScript (JS) is a lightweight interpreted programming language with first-class functions.

  • web

    Some thing interesting about web. New door for the world.

  • server

    A server is a program made to process requests and deliver data to clients.

  • Machine learning

    Machine learning is a way of modeling and interpreting data that allows a piece of software to respond intelligently.

  • Game

    Some thing interesting about game, make everyone happy.

Recommend Org

  • Facebook photo Facebook

    We are working to build community through open source technology. NB: members must have two-factor auth.

  • Microsoft photo Microsoft

    Open source projects and samples from Microsoft.

  • Google photo Google

    Google ❤️ Open Source for everyone.

  • D3 photo D3

    Data-Driven Documents codes.