分布式任务调度系统ScheduleMaster如何实现高效协同?

2026-05-05 21:562阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计2843个文字,预计阅读时间需要12分钟。

分布式任务调度系统ScheduleMaster如何实现高效协同?

1. ScheduleMaster是什么?ScheduleMaster是一种分布式任务调度系统,由国内开发者编写。简单来说,它是一个集中管理不同系统调度的平台。即对各个系统中的调度任务进行统一管理。

1.什么是ScheduleMaster

ScheduleMaster分布式任务调度系统,是国内的一位开发者写的。简称:集中任务调度系统,最简单的理解ScheduleMaster,就是对不同的系统里面的调度任务做统一管理的框架

例如我们现在有多个系统,每个系统针对自己处理不同的业务场景。衍生出自己的调度任务,想象一下,如果每个系统人为去维护,那随着调度任务越来越多,人是崩溃的吧,可见维护技术成本是巨大的,这时我们需要选择分布式任务系统框架做统一的管理

当然有目前有很多相对优秀分布式任务系统框架,我们主要学习 ScheduleMaster

2.使用ScheduleMaster

1.首先我们需要使用NET Core web Api创建几个模拟的微服务,分别为 考勤算薪邮件短信

2.下载开源的ScheduleMaster,并且使用ScheduleMaster调度我们的微服务接口

- sqlserver:"Persist Security Info = False; User ID =sa; Password =123456; Initial Catalog =schedule_master; Server =." - postgresql:"Server=localhost;Port=5432;Database=schedule_master;User Id=postgres;Password=123456;Pooling=true;MaxPoolSize=20;" - mysql:"Data Source=localhost;Database=schedule_master;User ID=root;Password=123456;pooling=true;CharSet=utf8mb4;port=3306;sslmode=none;TreatTinyAsBoolean=true"

修改Host的配置文件和支持的数据库,框架默认使用Mysql

修改Web的配置文件和支持的数据库,框架默认使用Mysql

3.进入Hos.ScheduleMaster.Web项目的发布目录,dotnet Hos.ScheduleMaster.Web.dll启动项目 ,此时会生成数据库

登录账号:admin

密码:111111

4.进入Hos.ScheduleMaster.QuartzHost项目的发布目录,执行命令,启动项目

dotnet Hos.ScheduleMaster.QuartzHost.dll --urls *:30003 1.配置Http调度任务

5.准备就绪后,使用后台查看节点管理,可以看到web主节点30000任务调度的接口30002已经在运行

6.使用任务列表菜单,创建定时调度任务,配置基础信息元数据配置,然后点击保存就开始执行任务

2.配置程序集调度任务

1.创建一个类库,安装ScheduleMaster库, 创建一个类继承TaskBase,实现抽象方法,然后编译程序集

namespace TaskExcuteService { class AssemblyTask : TaskBase { public override void Run(TaskContext context) { context.WriteLog("程序集任务"); } } }

2.找到debug目录,去掉Base程序集,然后添加为压缩文件

3.在web界面找到任务配置,选择程序集进行基本配置,配置元数据,输入程序集名称,然后输入类,并将程序集上传,保存就可以运行了

3.使用Api接入任务

为了方便业务系统更好的接入调度系统,ScheduleMaster创建任务不仅可以在控制台中实现,系统也提供了WebAPI供业务系统使用代码接入,这种方式对延时任务来说尤其重要。

1.API Server 对接流程
  • 在控制台中创建好专用的API对接用户账号。

  • 使用对接账号的用户名设置为yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

参数名称 参数类型 是否必填 说明 MetaType int 是 任务类型,这里固定是1 Title string 是 任务名称 RunLoop bool 是 是否按周期执行 CronExpression string 否 cron表达式,如果RunLoop为true则必填 AssemblyName string 是 程序集名称 ClassName string 是 执行类名称,包含完整命名空间 StartDate DateTime 是 任务开始时间 EndDate DateTime 否 任务停止时间,为空表示不限停止时间 Remark string 否 任务描述说明 Keepers List<int> 否 监护人id Nexts List<guid> 否 子级任务id Executors List<string> 否 执行节点名称 RunNow bool 否 创建成功是否立即启动 Params List<ScheduleParam> 否 自定义参数列表,也可以通过CustomParamsJson字段直接传json格式字符串

ScheduleParam:

参数名称 参数类型 是否必填 说明 ParamKey string 是 参数名称 ParamValue string 是 参数值 ParamRemark string 否 参数说明

HttpClient client = new HttpClient(); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("MetaType", "1")); args.Add(new KeyValuePair<string, string>("RunLoop", "true")); args.Add(new KeyValuePair<string, string>("CronExpression", "33 0/8 * * * ?")); args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created")); args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("Title", "程序集接口测试任务")); args.Add(new KeyValuePair<string, string>("AssemblyName", "Hos.ScheduleMaster.Demo")); args.Add(new KeyValuePair<string, string>("ClassName", "Hos.ScheduleMaster.Demo.Simple")); args.Add(new KeyValuePair<string, string>("CustomParamsJson", "[{\"ParamKey\":\"k1\",\"ParamValue\":\"1111\",\"ParamRemark\":\"r1\"},{\"ParamKey\":\"k2\",\"ParamValue\":\"2222\",\"ParamRemark\":\"r2\"}]")); args.Add(new KeyValuePair<string, string>("Keepers", "1")); args.Add(new KeyValuePair<string, string>("Keepers", "2")); //args.Add(new KeyValuePair<string, string>("Nexts", "")); //args.Add(new KeyValuePair<string, string>("Executors", "")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/Task/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); 3.创建HTTP任务

  • 接口地址:yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

参数名称 参数类型 是否必填 说明 MetaType int 是 任务类型,这里固定是2 Title string 是 任务名称 RunLoop bool 是 是否按周期执行 CronExpression string 否 cron表达式,如果RunLoop为true则必填 StartDate DateTime 是 任务开始时间 EndDate DateTime 否 任务停止时间,为空表示不限停止时间 Remark string 否 任务描述说明 HttpRequestUrl string 是 请求地址 HttpMethod string 是 请求方式,仅支持GET\POST\PUT\DELETE HttpContentType string 是 参数格式,仅支持application/json和application/x-www-form-urlencoded HttpHeaders string 否 自定义请求头,ScheduleParam列表的json字符串 HttpBody string 是 如果是json格式参数,则是对应参数的json字符串;如果是form格式参数,则是对应ScheduleParam列表的json字符串。 Keepers List<int> 否 监护人id Nexts List<guid> 否 子级任务id Executors List<string> 否 执行节点名称 RunNow bool 否 创建成功是否立即启动

HttpClient client = new HttpClient(); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("MetaType", "2")); args.Add(new KeyValuePair<string, string>("RunLoop", "true")); args.Add(new KeyValuePair<string, string>("CronExpression", "22 0/8 * * * ?")); args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created")); args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("Title", "Http接口测试任务")); args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "localhost:56655/api/1.0/value/jsonpost")); args.Add(new KeyValuePair<string, string>("HttpMethod", "POST")); args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json")); args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]")); args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/Task/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); 4.创建延时任务

  • 接口地址:yourip:30000/api/delaytask/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

    分布式任务调度系统ScheduleMaster如何实现高效协同?

参数名称 参数类型 是否必填 说明 SourceApp string 是 来源 Topic string 是 主题 ContentKey string 是 业务关键字 DelayTimeSpan int 是 延迟相对时间 DelayAbsoluteTime DateTime 是 延迟绝对时间 NotifyUrl string 是 回调地址 NotifyDataType string 是 回调参数格式,仅支持application/json和application/x-www-form-urlencoded NotifyBody string 是 回调参数,json格式字符串

for (int i = 0; i < 5; i++) { int rndNum = new Random().Next(20, 500); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("SourceApp", "TestApp")); args.Add(new KeyValuePair<string, string>("Topic", "TestApp.Trade.TimeoutCancel")); args.Add(new KeyValuePair<string, string>("ContentKey", i.ToString())); args.Add(new KeyValuePair<string, string>("DelayTimeSpan", rndNum.ToString())); args.Add(new KeyValuePair<string, string>("DelayAbsoluteTime", DateTime.Now.AddSeconds(rndNum).ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("NotifyUrl", "localhost:56655/api/1.0/value/delaypost")); args.Add(new KeyValuePair<string, string>("NotifyDataType", "application/json")); args.Add(new KeyValuePair<string, string>("NotifyBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/DelayTask/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); } 4.框架简单分析 1.全局设计

根据官网的设计图,以及操作的流程,简单总结一下任务全局流程为客户端—–>master——>work—–>执行任务。从大的架构方向的思想就是,将原有单个服务业务和任务调度混合的方式做一些改变,使业务调度分离,专门把任务调度部分剥离出来,作为一个独立的进程,来统一调用管理任务,而原有服务只做业务处理

2.Master和Work分析

我们主要从主节点从节点数据表这几个方面来简单分析,整个业务的时序流程,从本质来看并不复杂masterWork之间的关系是相互配合的,可能乍一看下面整个图有点混乱,下面来解释一下,其实Master节点将任务添加数据中,然后work节点,去从对应的数据表中取出任务数据,然后根据任务对应的配置生成配置信息,调用Quartz执行任务,这就是我们整个框架中最核心的业务。

当然master和work除了主要承载了整个管理系统的UI可视化、后台业务操作任务执行之外,如果从细节以及实现方式来说主要做了以下事情:

Master

  • 1.分配任务执行选择节点
  • 2.对work节点进行健康检查,对任务进行故障转移

Work

  • 1.取出任务配置信息
  • 2.使用Quartz根据配置运行任务
  • 3.使用反射调用程序集
  • 4.使用httpclient调用http 接口

本文共计2843个文字,预计阅读时间需要12分钟。

分布式任务调度系统ScheduleMaster如何实现高效协同?

1. ScheduleMaster是什么?ScheduleMaster是一种分布式任务调度系统,由国内开发者编写。简单来说,它是一个集中管理不同系统调度的平台。即对各个系统中的调度任务进行统一管理。

1.什么是ScheduleMaster

ScheduleMaster分布式任务调度系统,是国内的一位开发者写的。简称:集中任务调度系统,最简单的理解ScheduleMaster,就是对不同的系统里面的调度任务做统一管理的框架

例如我们现在有多个系统,每个系统针对自己处理不同的业务场景。衍生出自己的调度任务,想象一下,如果每个系统人为去维护,那随着调度任务越来越多,人是崩溃的吧,可见维护技术成本是巨大的,这时我们需要选择分布式任务系统框架做统一的管理

当然有目前有很多相对优秀分布式任务系统框架,我们主要学习 ScheduleMaster

2.使用ScheduleMaster

1.首先我们需要使用NET Core web Api创建几个模拟的微服务,分别为 考勤算薪邮件短信

2.下载开源的ScheduleMaster,并且使用ScheduleMaster调度我们的微服务接口

- sqlserver:"Persist Security Info = False; User ID =sa; Password =123456; Initial Catalog =schedule_master; Server =." - postgresql:"Server=localhost;Port=5432;Database=schedule_master;User Id=postgres;Password=123456;Pooling=true;MaxPoolSize=20;" - mysql:"Data Source=localhost;Database=schedule_master;User ID=root;Password=123456;pooling=true;CharSet=utf8mb4;port=3306;sslmode=none;TreatTinyAsBoolean=true"

修改Host的配置文件和支持的数据库,框架默认使用Mysql

修改Web的配置文件和支持的数据库,框架默认使用Mysql

3.进入Hos.ScheduleMaster.Web项目的发布目录,dotnet Hos.ScheduleMaster.Web.dll启动项目 ,此时会生成数据库

登录账号:admin

密码:111111

4.进入Hos.ScheduleMaster.QuartzHost项目的发布目录,执行命令,启动项目

dotnet Hos.ScheduleMaster.QuartzHost.dll --urls *:30003 1.配置Http调度任务

5.准备就绪后,使用后台查看节点管理,可以看到web主节点30000任务调度的接口30002已经在运行

6.使用任务列表菜单,创建定时调度任务,配置基础信息元数据配置,然后点击保存就开始执行任务

2.配置程序集调度任务

1.创建一个类库,安装ScheduleMaster库, 创建一个类继承TaskBase,实现抽象方法,然后编译程序集

namespace TaskExcuteService { class AssemblyTask : TaskBase { public override void Run(TaskContext context) { context.WriteLog("程序集任务"); } } }

2.找到debug目录,去掉Base程序集,然后添加为压缩文件

3.在web界面找到任务配置,选择程序集进行基本配置,配置元数据,输入程序集名称,然后输入类,并将程序集上传,保存就可以运行了

3.使用Api接入任务

为了方便业务系统更好的接入调度系统,ScheduleMaster创建任务不仅可以在控制台中实现,系统也提供了WebAPI供业务系统使用代码接入,这种方式对延时任务来说尤其重要。

1.API Server 对接流程
  • 在控制台中创建好专用的API对接用户账号。

  • 使用对接账号的用户名设置为yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

参数名称 参数类型 是否必填 说明 MetaType int 是 任务类型,这里固定是1 Title string 是 任务名称 RunLoop bool 是 是否按周期执行 CronExpression string 否 cron表达式,如果RunLoop为true则必填 AssemblyName string 是 程序集名称 ClassName string 是 执行类名称,包含完整命名空间 StartDate DateTime 是 任务开始时间 EndDate DateTime 否 任务停止时间,为空表示不限停止时间 Remark string 否 任务描述说明 Keepers List<int> 否 监护人id Nexts List<guid> 否 子级任务id Executors List<string> 否 执行节点名称 RunNow bool 否 创建成功是否立即启动 Params List<ScheduleParam> 否 自定义参数列表,也可以通过CustomParamsJson字段直接传json格式字符串

ScheduleParam:

参数名称 参数类型 是否必填 说明 ParamKey string 是 参数名称 ParamValue string 是 参数值 ParamRemark string 否 参数说明

HttpClient client = new HttpClient(); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("MetaType", "1")); args.Add(new KeyValuePair<string, string>("RunLoop", "true")); args.Add(new KeyValuePair<string, string>("CronExpression", "33 0/8 * * * ?")); args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created")); args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("Title", "程序集接口测试任务")); args.Add(new KeyValuePair<string, string>("AssemblyName", "Hos.ScheduleMaster.Demo")); args.Add(new KeyValuePair<string, string>("ClassName", "Hos.ScheduleMaster.Demo.Simple")); args.Add(new KeyValuePair<string, string>("CustomParamsJson", "[{\"ParamKey\":\"k1\",\"ParamValue\":\"1111\",\"ParamRemark\":\"r1\"},{\"ParamKey\":\"k2\",\"ParamValue\":\"2222\",\"ParamRemark\":\"r2\"}]")); args.Add(new KeyValuePair<string, string>("Keepers", "1")); args.Add(new KeyValuePair<string, string>("Keepers", "2")); //args.Add(new KeyValuePair<string, string>("Nexts", "")); //args.Add(new KeyValuePair<string, string>("Executors", "")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/Task/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); 3.创建HTTP任务

  • 接口地址:yourip:30000/api/task/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

参数名称 参数类型 是否必填 说明 MetaType int 是 任务类型,这里固定是2 Title string 是 任务名称 RunLoop bool 是 是否按周期执行 CronExpression string 否 cron表达式,如果RunLoop为true则必填 StartDate DateTime 是 任务开始时间 EndDate DateTime 否 任务停止时间,为空表示不限停止时间 Remark string 否 任务描述说明 HttpRequestUrl string 是 请求地址 HttpMethod string 是 请求方式,仅支持GET\POST\PUT\DELETE HttpContentType string 是 参数格式,仅支持application/json和application/x-www-form-urlencoded HttpHeaders string 否 自定义请求头,ScheduleParam列表的json字符串 HttpBody string 是 如果是json格式参数,则是对应参数的json字符串;如果是form格式参数,则是对应ScheduleParam列表的json字符串。 Keepers List<int> 否 监护人id Nexts List<guid> 否 子级任务id Executors List<string> 否 执行节点名称 RunNow bool 否 创建成功是否立即启动

HttpClient client = new HttpClient(); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("MetaType", "2")); args.Add(new KeyValuePair<string, string>("RunLoop", "true")); args.Add(new KeyValuePair<string, string>("CronExpression", "22 0/8 * * * ?")); args.Add(new KeyValuePair<string, string>("Remark", "By Xunit Tester Created")); args.Add(new KeyValuePair<string, string>("StartDate", DateTime.Today.ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("Title", "Http接口测试任务")); args.Add(new KeyValuePair<string, string>("HttpRequestUrl", "localhost:56655/api/1.0/value/jsonpost")); args.Add(new KeyValuePair<string, string>("HttpMethod", "POST")); args.Add(new KeyValuePair<string, string>("HttpContentType", "application/json")); args.Add(new KeyValuePair<string, string>("HttpHeaders", "[]")); args.Add(new KeyValuePair<string, string>("HttpBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/Task/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); 4.创建延时任务

  • 接口地址:yourip:30000/api/delaytask/create

  • 请求类型:POST

  • 参数格式:application/x-www-form-urlencoded

  • 返回结果:创建成功返回任务id

  • 参数列表:

    分布式任务调度系统ScheduleMaster如何实现高效协同?

参数名称 参数类型 是否必填 说明 SourceApp string 是 来源 Topic string 是 主题 ContentKey string 是 业务关键字 DelayTimeSpan int 是 延迟相对时间 DelayAbsoluteTime DateTime 是 延迟绝对时间 NotifyUrl string 是 回调地址 NotifyDataType string 是 回调参数格式,仅支持application/json和application/x-www-form-urlencoded NotifyBody string 是 回调参数,json格式字符串

for (int i = 0; i < 5; i++) { int rndNum = new Random().Next(20, 500); List<KeyValuePair<string, string>> args = new List<KeyValuePair<string, string>>(); args.Add(new KeyValuePair<string, string>("SourceApp", "TestApp")); args.Add(new KeyValuePair<string, string>("Topic", "TestApp.Trade.TimeoutCancel")); args.Add(new KeyValuePair<string, string>("ContentKey", i.ToString())); args.Add(new KeyValuePair<string, string>("DelayTimeSpan", rndNum.ToString())); args.Add(new KeyValuePair<string, string>("DelayAbsoluteTime", DateTime.Now.AddSeconds(rndNum).ToString("yyyy-MM-dd HH:mm:ss"))); args.Add(new KeyValuePair<string, string>("NotifyUrl", "localhost:56655/api/1.0/value/delaypost")); args.Add(new KeyValuePair<string, string>("NotifyDataType", "application/json")); args.Add(new KeyValuePair<string, string>("NotifyBody", "{ \"Posts\": [{ \"PostId\": 666, \"Title\": \"tester\", \"Content\":\"testtesttest\" }], \"BlogId\": 111, \"Url\":\"qweqrrttryrtyrtrtrt\" }")); HttpContent reqContent = new FormUrlEncodedContent(args); var response = await client.PostAsync("localhost:30000/api/DelayTask/Create", reqContent); var content = await response.Content.ReadAsStringAsync(); Debug.WriteLine(content); } 4.框架简单分析 1.全局设计

根据官网的设计图,以及操作的流程,简单总结一下任务全局流程为客户端—–>master——>work—–>执行任务。从大的架构方向的思想就是,将原有单个服务业务和任务调度混合的方式做一些改变,使业务调度分离,专门把任务调度部分剥离出来,作为一个独立的进程,来统一调用管理任务,而原有服务只做业务处理

2.Master和Work分析

我们主要从主节点从节点数据表这几个方面来简单分析,整个业务的时序流程,从本质来看并不复杂masterWork之间的关系是相互配合的,可能乍一看下面整个图有点混乱,下面来解释一下,其实Master节点将任务添加数据中,然后work节点,去从对应的数据表中取出任务数据,然后根据任务对应的配置生成配置信息,调用Quartz执行任务,这就是我们整个框架中最核心的业务。

当然master和work除了主要承载了整个管理系统的UI可视化、后台业务操作任务执行之外,如果从细节以及实现方式来说主要做了以下事情:

Master

  • 1.分配任务执行选择节点
  • 2.对work节点进行健康检查,对任务进行故障转移

Work

  • 1.取出任务配置信息
  • 2.使用Quartz根据配置运行任务
  • 3.使用反射调用程序集
  • 4.使用httpclient调用http 接口