Fastflow:有没有基于golang的轻量级工作流框架推荐?
- 内容介绍
- 文章标签
- 相关推荐
本文共计1441个文字,预计阅读时间需要6分钟。
Fastflow 是一个基于 Golang 协程、支持水平扩展的分布式高性能工作流框架。它具有以下特点:易用性:工作流模型基于 DAG 定义,同时提供开箱即用的功能。
Fastflow 是什么?用一句话来定义它:一个 基于golang协程、支持水平扩容的分布式高性能工作流框架。
它具有以下特点:
- 易用性:工作流模型基于
DAG来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能 - 高性能:得益于 golang 的协程 与 channel 技术,
fastflow可以在单实例上并行执行数百、数千乃至数万个任务 - 可观测性:
fastflow基于Prometheus的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。 - 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
- 可扩展性:
fastflow准备了部分开箱即用的任务操作,比如 root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }程序运行目录下的
test-dag.yamlid: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]Basic Task与Task之间的通信由于任务都是基于
goroutine来执行,因此任务之间的context是共享的,意味着你完全可以使用以下的代码:func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }但是注意这样做有个弊端:当节点重启时,如果任务尚未执行完毕,那么这部分内容会丢失。
如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来传递进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过Store组件持久化,以确保数据不会丢失,用法如下:func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil }任务日志fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过
Store组件持久化,用法如下:func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil }使用Dag变量上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"随后我们可以使用
Commander组件来启动一个具体的工作流:mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })这样本次启动的工作流的变量则被赋值为
demo.txt,接下来我们有两种方式去消费它- 带参数的Action
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"PrintAction.go:
type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }- 编程式读取
fastflow 也提供了相关函数来获取 Dag 变量
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil }分布式锁如前所述,你可以在直接使用
Keeper模块提供的分布式锁,如下所示:... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...其中:
LockTTL表示你持有该锁的TTL,到期之后会自动释放,默认30sReentrant用于需要实现可重入的分布式锁的场景,作为持有场景的标识,默认为空,表示该锁不可重入
本文共计1441个文字,预计阅读时间需要6分钟。
Fastflow 是一个基于 Golang 协程、支持水平扩展的分布式高性能工作流框架。它具有以下特点:易用性:工作流模型基于 DAG 定义,同时提供开箱即用的功能。
Fastflow 是什么?用一句话来定义它:一个 基于golang协程、支持水平扩容的分布式高性能工作流框架。
它具有以下特点:
- 易用性:工作流模型基于
DAG来定义,同时还提供开箱即用的 API,你可以随时通过 API 创建、运行、暂停工作流等,在开发新的原子能力时还提供了开箱即用的分布式锁功能 - 高性能:得益于 golang 的协程 与 channel 技术,
fastflow可以在单实例上并行执行数百、数千乃至数万个任务 - 可观测性:
fastflow基于Prometheus的 metrics 暴露了当前实例上的任务执行信息,比如并发任务数、任务分发时间等。 - 可伸缩性:支持水平伸缩,以克服海量任务带来的单点瓶颈,同时通过选举 Leader 节点来保障各个节点的负载均衡
- 可扩展性:
fastflow准备了部分开箱即用的任务操作,比如 root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := keeper.Init(); err != nil { log.Fatal(fmt.Errorf("init keeper failed: %w", err)) } // init store st := mongoStore.NewStore(&mongoStore.StoreOption{ // if your mongo does not set user/pwd, youshould remove it ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin", Database: "mongo-demo", Prefix: "test", }) if err := st.Init(); err != nil { log.Fatal(fmt.Errorf("init store failed: %w", err)) } go createDagAndInstance() // start fastflow if err := fastflow.Start(&fastflow.InitialOption{ Keeper: keeper, Store: st, // use yaml to define dag ReadDagFromDir: "./", }); err != nil { panic(fmt.Sprintf("init fastflow failed: %s", err)) } } func createDagAndInstance() { // wait fast start completed time.Sleep(time.Second) // run some dag instance for i := 0; i < 10; i++ { _, err := mod.GetCommander().RunDag("test-dag", nil) if err != nil { log.Fatal(err) } time.Sleep(time.Second * 10) } }程序运行目录下的
test-dag.yamlid: "test-dag" name: "test" tasks: - id: "task1" actionName: "PrintAction" - id: "task2" actionName: "PrintAction" dependOn: ["task1"] - id: "task3" actionName: "PrintAction" dependOn: ["task2"]Basic Task与Task之间的通信由于任务都是基于
goroutine来执行,因此任务之间的context是共享的,意味着你完全可以使用以下的代码:func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.WithValue("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.Context().Value("key") return nil }但是注意这样做有个弊端:当节点重启时,如果任务尚未执行完毕,那么这部分内容会丢失。
如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来传递进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过Store组件持久化,以确保数据不会丢失,用法如下:func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error { ctx.ShareData().Set("key", "value") return nil } func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error { val := ctx.ShareData().Get("key") return nil }任务日志fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过
Store组件持久化,用法如下:func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { ctx.Trace("some message") return nil }使用Dag变量上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt"随后我们可以使用
Commander组件来启动一个具体的工作流:mod.GetCommander().RunDag("test-id", map[string]string{ "fileName": "demo.txt", })这样本次启动的工作流的变量则被赋值为
demo.txt,接下来我们有两种方式去消费它- 带参数的Action
id: "test-dag" name: "test" vars: fileName: desc: "the file name" defaultValue: "file.txt" tasks: - id: "task1" action: "PrintAction" params: # using {{var}} to consume dag's variable fileName: "{{fileName}}"PrintAction.go:
type PrintParams struct { FileName string `json:"fileName"` } type PrintAction struct { } // Name define the unique action identity, it will be used by Task func (a *PrintAction) Name() string { return "PrintAction" } func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error { cinput := params.(*ActionParam) fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value)) return nil } func (a *PrintAction) ParameterNew() interface{} { return &PrintParams{} }- 编程式读取
fastflow 也提供了相关函数来获取 Dag 变量
func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error { // get variable by name ctx.GetVar("fileName") // iterate variables ctx.IterateVars(func(key, val string) (stop bool) { ... }) return nil }分布式锁如前所述,你可以在直接使用
Keeper模块提供的分布式锁,如下所示:... mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(), mod.LockTTL(time.Second), mod.Reentrant("worker-key1")) ...其中:
LockTTL表示你持有该锁的TTL,到期之后会自动释放,默认30sReentrant用于需要实现可重入的分布式锁的场景,作为持有场景的标识,默认为空,表示该锁不可重入

