Fastflow:有没有基于golang的轻量级工作流框架推荐?

2026-05-29 06:003阅读0评论SEO教程
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计1441个文字,预计阅读时间需要6分钟。

Fastflow:有没有基于golang的轻量级工作流框架推荐?

Fastflow 是一个基于 Golang 协程、支持水平扩展的分布式高性能工作流框架。它具有以下特点:易用性:工作流模型基于 DAG 定义,同时提供开箱即用的功能。

Fastflow:有没有基于golang的轻量级工作流框架推荐?

Fastflow 是什么?用一句话来定义它:一个 基于golang协程支持水平扩容的分布式高性能工作流框架
它具有以下特点:

  • 易用性:工作流模型基于 DAG 来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能
  • 高性能:得益于 golang 的协程 与 channel 技术,fastflow 可以在单实例上并行执行数百、数千乃至数万个任务
  • 可观测性fastflow 基于 Prometheus 的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。
  • 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
  • 可扩展性fastflow 准备了部分开箱即用的任务操作,比如 root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }

    程序运行目录下的test-dag.yaml

    id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"] Basic Task与Task之间的通信

    由于任务都是基于 goroutine 来执行,因此任务之间的 context 是共享的,意味着你完全可以使用以下的代码:

    func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }

    但是注意这样做有个弊端:当节点重启时,如果任务尚未执行完毕,那么这部分内容会丢失。
    如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来传递进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过 Store 组件持久化,以确保数据不会丢失,用法如下:

    func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil } 任务日志

    fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过 Store 组件持久化,用法如下:

    func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil } 使用Dag变量

    上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量

    id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"

    随后我们可以使用 Commander 组件来启动一个具体的工作流:

    mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })

    这样本次启动的工作流的变量则被赋值为 demo.txt,接下来我们有两种方式去消费它

    1. 带参数的Action

    id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"

    PrintAction.go:

    type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }

    1. 编程式读取
      fastflow 也提供了相关函数来获取 Dag 变量

    func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil } 分布式锁

    如前所述,你可以在直接使用 Keeper 模块提供的分布式锁,如下所示:

    ... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...

    其中:

    • LockTTL 表示你持有该锁的TTL,到期之后会自动释放,默认 30s
    • Reentrant 用于需要实现可重入的分布式锁的场景,作为持有场景的标识,默认为空,表示该锁不可重入
    欢迎转载,注明出处即可。 如果你觉得这篇博文帮助到你了,请点下右下角的推荐让更多人看到它。

本文共计1441个文字,预计阅读时间需要6分钟。

Fastflow:有没有基于golang的轻量级工作流框架推荐?

Fastflow 是一个基于 Golang 协程、支持水平扩展的分布式高性能工作流框架。它具有以下特点:易用性:工作流模型基于 DAG 定义,同时提供开箱即用的功能。

Fastflow:有没有基于golang的轻量级工作流框架推荐?

Fastflow 是什么?用一句话来定义它:一个 基于golang协程支持水平扩容的分布式高性能工作流框架
它具有以下特点:

  • 易用性:工作流模型基于 DAG 来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能
  • 高性能:得益于 golang 的协程 与 channel 技术,fastflow 可以在单实例上并行执行数百、数千乃至数万个任务
  • 可观测性fastflow 基于 Prometheus 的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。
  • 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
  • 可扩展性fastflow 准备了部分开箱即用的任务操作,比如 root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }

    程序运行目录下的test-dag.yaml

    id: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"] Basic Task与Task之间的通信

    由于任务都是基于 goroutine 来执行,因此任务之间的 context 是共享的,意味着你完全可以使用以下的代码:

    func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }

    但是注意这样做有个弊端:当节点重启时,如果任务尚未执行完毕,那么这部分内容会丢失。
    如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来传递进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过 Store 组件持久化,以确保数据不会丢失,用法如下:

    func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil } 任务日志

    fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过 Store 组件持久化,用法如下:

    func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil } 使用Dag变量

    上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量

    id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"

    随后我们可以使用 Commander 组件来启动一个具体的工作流:

    mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })

    这样本次启动的工作流的变量则被赋值为 demo.txt,接下来我们有两种方式去消费它

    1. 带参数的Action

    id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"

    PrintAction.go:

    type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }

    1. 编程式读取
      fastflow 也提供了相关函数来获取 Dag 变量

    func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil } 分布式锁

    如前所述,你可以在直接使用 Keeper 模块提供的分布式锁,如下所示:

    ... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...

    其中:

    • LockTTL 表示你持有该锁的TTL,到期之后会自动释放,默认 30s
    • Reentrant 用于需要实现可重入的分布式锁的场景,作为持有场景的标识,默认为空,表示该锁不可重入
    欢迎转载,注明出处即可。 如果你觉得这篇博文帮助到你了,请点下右下角的推荐让更多人看到它。