2

Go 语言实现分布式 crontab 任务系统

 2 years ago
source link: https://lemon-cs.github.io/2022/01/17/%E5%90%8E%E7%AB%AF/Golang/Go%E9%A1%B9%E7%9B%AE%E5%AE%9E%E6%88%98/Go%E8%AF%AD%E8%A8%80%E5%AE%9E%E7%8E%B0%E5%88%86%E5%B8%83%E5%BC%8Fcrontab%E4%BB%BB%E5%8A%A1%E7%B3%BB%E7%BB%9F/#%E4%BB%A3%E7%A0%81%E5%AE%9E%E7%8E%B0
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Lemon-CS

Go 语言实现分布式 crontab 任务系统

发表于 2022-01-17|更新于 2022-01-19|Go
字数总计:6k|阅读时长:24 分钟 | 阅读量:12

cmd 指令 & cron 表达式

cmd 基础入门

Demo1

package main

import (
"fmt"
"os/exec"
)

func main() {
var (
cmd *exec.Cmd
err error
)

cmd = exec.Command("/bin/bash", "-c", "echo 1;echo2;")

// cmd = exec.Command("C:\\cygwin64\\bin\\bash.exe", "-c", "echo 1")

err = cmd.Run()

fmt.Println(err)
}

在上述代码中,执行 shell 交互并且得到了结果,呢么本质是怎么样的呢?

  • 任务提交到 golang 程序;
  • golang 会向 linux 申请 pipe 资源,供数据传输;
  • golang 通过 fork 子进程,
  • 子进程将标准输入、标准输出、错误输出写入到 pipe 里面。
  • 子进程基于 golang 代码进行逻辑控制,比如执行 bash 命令。
  • 子进程将结果输入到 pipe 中
  • 又因为 pipe 一段连接 golang 程序,所以 golang 可以得到 pipe 的输出结果。
  • bash 执行完毕,子进程退出,golang 回收子进程资源。

Demo2

创建一个子协程,做两秒休眠后打印信息。但在第一秒休眠后将其中断。

package main

import (
"fmt"
"os/exec"
)

func main() {
var (
cmd *exec.Cmd
output []byte
err error
)

// 生成Cmd
cmd = exec.Command("/bin/bash", "-c", "sleep 5;ls -l")

// 执行了命令, 捕获了子进程的输出( pipe )
if output, err = cmd.CombinedOutput(); err != nil {
fmt.Println(err)
return
}

// 打印子进程的输出
fmt.Println(string(output))

}

context 中具有一个 channel ,通过 cancelFunc 对该 channel 进行了关闭行为,select {case <- ctx.done } 监听到后,kill 掉相关的子进程 pid;

Demo3

在上述代码基础下,使其可以做到子进程的结果告知主进程:

package main

import (
"context"
"fmt"
"os/exec"
"time"
)

type result struct {
err error
output []byte
}

func main() {
// 执行1个cmd, 让它在一个协程里去执行, 让它执行2秒: sleep 2; echo hello;
// 1秒的时候, 我们杀死cmd

var (
ctx context.Context
cancelFunc context.CancelFunc
cmd *exec.Cmd
resultChan chan *result
res *result
)

// 创建了一个结果队列
resultChan = make(chan *result, 1000)

// context: chan byte
// cancelFunc: close(chan byte)
ctx, cancelFunc = context.WithCancel(context.TODO())

go func() {
var (
output []byte
err error
)
cmd = exec.CommandContext(ctx, "/bin/bash", "-c", "sleep 2;echo hello;")

// 执行任务, 捕获输出
output, err = cmd.CombinedOutput()

// 把任务输出结果, 传给main协程
resultChan <- &result{
err: err,
output: output,
}
}()

// 继续往下走
time.Sleep(1 * time.Second)

// 取消上下文
cancelFunc()

// 在main协程里, 等待子协程的退出,并打印任务执行结果
res = <-resultChan

// 打印任务执行结果
fmt.Printf("执行结果为,err:%s,output:%s", res.err, string(res.output))
}

Cron 表达式

*****
星期

每个 * 代表的意思是每 X;

  • 每隔 5 分钟执行一次: */5 * * * * echo hello > /tmp/x.log ,具体是指每个 5 分钟的时刻进行一次执行。
  • 第 1-5 分钟执行 5 次 :1-5 * * * * echo hello > xxxx , 具体指每个 1、2、3、4、5 分钟进行一次执行。
  • 每天的 10 点,22 点执行一次:0 10,22 * * * echo bye | tail -1

在上述的定时任务中,指代每 30 分钟执行。

我们可以基于这样的 cron 表达式,形成 分钟、小时、日期、月份的枚举。

因为每 30 分钟执行一次,所以分钟的枚举只有一项 是 30 ;
而小时、日、月 在 * 指代每一的情况下,枚举中包含所有的自然值;

如当前时间是 40 分钟 , 10 小时 , 27 日。

  1. 时间从大到小进行判断,比如 10 小时,在对应的范围内,下来进行判定分;
  2. 那么分钟不在对应的枚举范围内,小时需要进到下一位枚举,既 11 点,需补时间 20 分钟;
  3. 此时时间为 00 分,11 小时,27 日。
  4. 那么分钟不在对应的枚举范围内,下一个枚举时间为 30 分钟,当所有时间都符合枚举的时候,说明 cron 符合。

Demo1

基于 cronexpr 做单个任务调度

package main

import (
"fmt"
"github.com/gorhill/cronexpr"
"time"
)

func main() {
var (
expr *cronexpr.Expression
err error
now time.Time
nextTime time.Time
)

// linux crontab
// 秒粒度, 年配置(2018-2099)
// 哪一分钟(0-59),哪小时(0-23),哪天(1-31),哪月(1-12),星期几(0-6)

// 每隔5分钟执行1次
//if expr, err = cronexpr.Parse("*/5 * * * *"); err != nil {
// fmt.Println(err)
// return
//}
// 0, 5, 10, 15, 20 ... 55
// 0, 6, 12, 18, .. 48..

if expr, err = cronexpr.Parse("*/5 * * * * * *"); err != nil {
fmt.Println(err)
return
}

// 当前时间
now = time.Now()

// 下次调度时间
nextTime = expr.Next(now)
fmt.Println(now, nextTime)

// 等待这个定时器超时
time.AfterFunc(nextTime.Sub(now), func() {
fmt.Println("被调度了:", nextTime)
})

time.Sleep(5 * time.Second)
}

Demo2

执行多个任务

package main

import (
"fmt"
"github.com/gorhill/cronexpr"
"time"
)

// 代表一个任务
type CronJob struct {
expr *cronexpr.Expression
nextTime time.Time // expr.Next(now)
}

func main() {

var (
cronJob *CronJob
expr *cronexpr.Expression
now time.Time
scheduleTable map[string]*CronJob // key: 任务的名字
)

// 需要有1个调度协程, 它定时检查所有的Cron任务, 谁过期了就执行谁
scheduleTable = make(map[string]*CronJob)
now = time.Now()

// 1, 我们定义2个cronjob
expr = cronexpr.MustParse("*/5 * * * * * *")
cronJob = &CronJob{
expr: expr,
nextTime: expr.Next(now),
}
// 任务注册到调度表
scheduleTable["job1"] = cronJob

expr = cronexpr.MustParse("*/5 * * * * * *")
cronJob = &CronJob{
expr: expr,
nextTime: expr.Next(now),
}
// 任务注册到调度表
scheduleTable["job2"] = cronJob

// 启动一个调度协程
go func() {
var (
jobName string
cronJob *CronJob
now time.Time
)

// 定时检查一下任务调度表
for {
now = time.Now()

for jobName, cronJob = range scheduleTable {
// 判断是否过期
if cronJob.nextTime.Before(now) || cronJob.nextTime.Equal(now) {
// 启动一个协程, 执行这个任务
go func(jobName string) {
fmt.Println("执行:", jobName)
}(jobName)

// 计算下一次调度时间
cronJob.nextTime = cronJob.expr.Next(now)
fmt.Println(jobName, "下次执行时间:", cronJob.nextTime)
}
}

// 睡眠100毫秒
// time.Sleep(100 * time.Millisecond)
select {
case <-time.NewTimer(100 * time.Millisecond).C: // 将在100毫秒可读,返回

}
}
}()

time.Sleep(100 * time.Second)
}

etcd 入门

etcd 功能介绍

  1. 数据存储在集群中的高可用 K-V 存储。
  2. 允许应用实时监听存储中的 K-V 变化。
  3. 可以容忍单点故障,并支持网络分区

在传统的存储模型中:

  • 如果存储节点是单点存储,呢么遇到宕机,即刻不可用;
  • 如果是主从架构,当主库不可用的使用,虽然可以继续基于从库来读,单主从同步时延容忍度又是新的问题。

etcd 基于抽屉理论来解决该点,所谓的抽屉理论指:

假如我们有一个 30 人的班级,我将一个秘密告诉其中的 16 位同学,呢么随便挑选 16 个同学中,必然有一个是知道我秘密的同学。呢么假如班里一直会有一半以上的同学正常上课,呢么我这个秘密就能正确获取;

etcd 与 Raft 的关系

  • Raft 是强一致的集群日志同步算法

  • etcd 是一个分布式 KV 存储

  • etcd 利用 raft 算法在集群中同步 key-value 的

Raft 日志概念、异常安全

名词:

replication: 日志在 leader 生成,向 follower 赋值,达到各个节点的日志序列组中一致;
term: 任期,重新选举产生的 leader, term 会单调递增;
log index: 日支行在日志序列的下标;

选举:

  • raft 选举 leader 需要半数以上的节点参与;
  • 节点 commit 日志最多的选举为 Leader;
  • commit 日志同样多,则 term、index 越大的允许选举为 leader;

quorum (大多数) 模型

在这里插入图片描述

在这里插入图片描述

该模型要求集群中至少有 2N+1 个节点;

  • 调用者向 leader 写入信息后,leader 并不会立马同步返回给调用者,而是会 follower 进行同步。
  • 同步的 follower+leader 至少占半数以上的时候(既大于等于 N+1 个节点后),leader 完成本地提交,此时才会返回客户端。
  • 随后 leader 会异步通知 follower 自己完成提交操作,所以该模型也是两阶段提交。

etcd 相关特性

  1. 交互协议支持 GRPC,内部基于 ProtoBuffer;
  2. 底层存储是按 key 有序排列,支持顺序遍历;
  3. 因为 key 有序,所以 etcd 天然支持按目录结构高效遍历;
  4. 支持复杂事物,提供类型 if…then…else… 的事务能力;
  5. 基于租约机制实现 key 的 TTL 过期;
  6. etcd 支持 MVCC 多版本控制 (提交会在 version 单调递增,同 key 维护多个历史版本),以实现 watch 机制;
  7. 对于多版本控制,可以执行 compact 命令完成删除。

watch 工作原理

lease 租约

  1. 调用者通过 sdk 向 etcd 申请一个单位时长的租约,etcd 返回该租约的 id;
  2. 随后调用者带着这个租约 ID,向 etcd 申请 K-V 存储;
  3. K-V 存储引擎与租约建立了关联,当该租约过期的时候,便会想 K-V 存储引擎删除该记录;
  4. 而续租面向的仍旧是租约,需要调用者想租约申请 ‘续租’ ;

etcd 功能实践

安装:为了方便学习,本地安装单机环境;

启动日志如下:

相关 ctl 指令介绍 :

基础 API:

1: put
在这里插入图片描述

在这里插入图片描述

2:get
在这里插入图片描述

在这里插入图片描述

3: 根据前缀查询
在这里插入图片描述

在这里插入图片描述

4: watch
在这里插入图片描述

在这里插入图片描述

第一个 ctl watch 指定模糊前缀,第二个 ctl 对其进行变更;于是 ctl1 收到了变更。

相关 sdk 指令 case :

直接使用 go get github.com/coreos/etcd/clientv3 下载依赖

GO 代码启动测试:

package main

import (
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
)

// 客户端配置
config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立连接
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

client = client
}

PUT 操作

//用于读写etcd中的kv对
kv = clientv3.NewKV(client)

//put
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "helloOld"); err != nil {
fmt.Println(err)
} else {
fmt.Println("revision ", putResp.Header.Revision)
}

//put and get perv
if putResp, err = kv.Put(context.TODO(), "/cron/jobs/job1", "hello", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
fmt.Println("revision ", putResp.Header.Revision)
if putResp.PrevKv != nil {
fmt.Printf("prevValue : k = %s, v = %s ", string(putResp.PrevKv.Key), string(putResp.PrevKv.Value))
}
}

GET 操作

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job1"); err != nil {
fmt.Println(err)
} else {
fmt.Println(getResp.Kvs)
}

kv.Get(context.TODO(), “/cron/jobs/job1” ,OpOption) ;
Opoption 中可以下达查询是否要仅查个数、是否要查当前游标下几个元素等。

if getResp, err = kv.Get(context.TODO(), "/cron/jobs/",clientv3.WithPrefix()); err != nil {
fmt.Println(err)
} else {
fmt.Println(getResp.Kvs)
}

DEL 操作

if delResp, err = kv.Delete(context.TODO(), "/cron/jobs/job1", clientv3.WithPrevKV()); err != nil {
fmt.Println(err)
} else {
if len(delResp.PrevKvs) != 0 {
for idx, kvPair = range delResp.PrevKvs {
fmt.Printf(" index =%d ,del key =%s del value =%s: ", idx, string(kvPair.Key), string(kvPair.Value))
}
}
}

del 中,也同样支持追加 Opoption 行为,比如从某个 Key 开始,删除 limit 个;

租约

Demo1:创建一个简单的 KV,并挂接租约

//通过客户端申请租约
lease = clientv3.NewLease(client)
//默认时间是秒
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}

leaseId = leaseGrantResp.ID

kv = clientv3.NewKV(client)
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "default", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入成功", putResp.Header.Revision)

for {

if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}

if getResp.Count == 0 {
fmt.Println("kv 被移除了")
} else {
fmt.Println(getResp.Kvs)
}

time.Sleep(2 * time.Second)

}

Demo2:创建一个简单的 KV,并对其进行续约

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
lease clientv3.Lease
leaseId clientv3.LeaseID
putResp *clientv3.PutResponse
getResp *clientv3.GetResponse
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
kv clientv3.KV
leaseGrantResp *clientv3.LeaseGrantResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 申请一个lease(租约)
lease = clientv3.NewLease(client)

// 申请一个10秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 10); err != nil {
fmt.Println(err)
return
}

// 拿到租约的ID
leaseId = leaseGrantResp.ID

// 5秒后会取消自动续租
// 续租了5秒,停止了续租,此时应还有10秒的生命周期,一共15秒
// ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
if keepRespChan, err = lease.KeepAlive(context.TODO(), leaseId); err != nil {
fmt.Println(err)
return
}

// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else {
// 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// Put一个KV, 让它与租约关联起来, 从而实现10秒后自动过期
if putResp, err = kv.Put(context.TODO(), "/cron/lock/job1", "", clientv3.WithLease(leaseId)); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入成功:", putResp.Header.Revision)

// 定时的看一下key过期了没有
for {
if getResp, err = kv.Get(context.TODO(), "/cron/lock/job1"); err != nil {
fmt.Println(err)
return
}
if getResp.Count == 0 {
fmt.Println("kv过期了")
break
}
fmt.Println("还没过期:", getResp.Kvs)
time.Sleep(2 * time.Second)
}

}

keepRespChan的类型是chan of *clientv3.LeaseKeepAliveResponse,在 lease.KeepAlive () 的返回值中被初始化过了,keepRespChan 一直会是一个地址,KeepAlive 函数将会 close chan。此时 <-keepRespChan 返回 nil, 基于此点判断续约是否成功

监听

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"github.com/coreos/etcd/mvcc/mvccpb"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
getResp *clientv3.GetResponse
watchStartRevision int64
watcher clientv3.Watcher
watchRespChan <-chan clientv3.WatchResponse
watchResp clientv3.WatchResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// 模拟etcd中KV的变化
go func() {
for {
kv.Put(context.TODO(), "/cron/jobs/job7", "i am job7")
kv.Delete(context.TODO(), "/cron/jobs/job7")

time.Sleep(1 * time.Second)
}
}()

// 先GET到当前的值,并监听后续变化
if getResp, err = kv.Get(context.TODO(), "/cron/jobs/job7"); err != nil {
fmt.Println(err)
return
}

// 现在key是存在的
if len(getResp.Kvs) != 0 {
fmt.Println("当前值:", string(getResp.Kvs[0].Value))
}

// 当前etcd集群事务ID, 单调递增的
watchStartRevision = getResp.Header.Revision

// 创建一个watcher
watcher = clientv3.NewWatcher(client)

// 启动监听
fmt.Println("从该版本向后监听:", watchStartRevision)

ctx, cancelFunc := context.WithCancel(context.TODO())
time.AfterFunc(5*time.Second, func() {
cancelFunc()
})

watchRespChan = watcher.Watch(ctx, "/cron/jobs/job7", clientv3.WithRev(watchStartRevision))

// 处理kv变化事件
for watchResp = range watchRespChan {
for _, event := range watchResp.Events {
switch event.Type {
case mvccpb.PUT:
fmt.Println("修改为:", string(event.Kv.Value), "Revision:", event.Kv.CreateRevision, event.Kv.ModRevision)
case mvccpb.DELETE:
fmt.Println("删除了", "Revision:", event.Kv.ModRevision)
}
}
}

}

clientv3.Op Get/Put/Del 操作

同样的, 后面也可以加 WithPrefix 等其他 op 操作。

package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
putOp clientv3.Op
getOp clientv3.Op
opResp clientv3.OpResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// 用于读写etcd的键值对
kv = clientv3.NewKV(client)

// 创建Op: operation
putOp = clientv3.OpPut("/cron/jobs/job8", "123123123758923")

// 执行OP
if opResp, err = kv.Do(context.TODO(), putOp); err != nil {
fmt.Println(err)
return
}

fmt.Println("写入Revision:", opResp.Put().Header.Revision)

// 创建Op
getOp = clientv3.OpGet("/cron/jobs/job8")

// 执行OP
if opResp, err = kv.Do(context.TODO(), getOp); err != nil {
fmt.Println(err)
return
}

// 打印
fmt.Println("数据Revision:", opResp.Get().Kvs[0].ModRevision) // create rev == mod rev
fmt.Println("数据value:", string(opResp.Get().Kvs[0].Value))
}

乐观锁 case

基于 etcd 的乐观锁 与 java+zookeeper/redis 的同步抢锁,整体思路一致,回答清楚下面的问题,剩下的基于上述的 case 学习,可以独立完成。(以后有时间可以试试写一个 tryLock (Timeout time) 的方法试试)

F: 若单次加锁时间为 1S,但是作业任务超过了 1 秒,如何保证在接下来的时间仍旧作业的时候,资源依然独占?
A: 在 Java+redis 的分布式锁中, 可以通过当前作业线程创建相关子线程进行定时重置声明周期。而在 etcd 中可以通过向租约进行续约,来保证租约不过期;

F: 向 redis 中进行事务操作依赖 lua 编排指令成’原子‘执行 ,或者通过 redission 才可以做到。呢么 etcd 如何保证事务呢?
A: 天然支持,有蛮好用的 API;

F:当任务执行完毕后,如何进行释放 lock;
A:java+redis 中,往往是在 finally 中做 del 操作;etcd 中可以通过 defer 函数,当函数退时进行租约进行取消;

F: 这种分布式锁都会确保一个前提,若加锁节点宕机,相关的 key 或 path 要如何删除呢?
A: 在 redis 中设置 expire 和 etcd 中通过租约可以达成同样的目的。

代码关键部分:

  1. 申请一个租约,并对做到两点。
    a: 租约可进行续约,并且当前函数退出的时候,取消续约;
    b: 租约可被取消,触发条件为当前函数退出的时候;
  2. 基于 txn 事务进行相关操作
/*
分布式乐观锁
*/
package main

import (
"context"
"fmt"
"github.com/coreos/etcd/clientv3"
"time"
)

func main() {

var (
config clientv3.Config
client *clientv3.Client
err error
kv clientv3.KV
lease clientv3.Lease
leaseId clientv3.LeaseID
keepResp *clientv3.LeaseKeepAliveResponse
keepRespChan <-chan *clientv3.LeaseKeepAliveResponse
leaseGrantResp *clientv3.LeaseGrantResponse
txnResp *clientv3.TxnResponse
)

config = clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: 5 * time.Second,
}

// 建立一个客户端
if client, err = clientv3.New(config); err != nil {
fmt.Println(err)
return
}

// lease实现锁自动过期:以及续约
// op操作
// txn事务: if else then

// 1, 上锁 (创建租约, 自动续租, 拿着租约去抢占一个key)
lease = clientv3.NewLease(client)

// 申请一个5秒的租约
if leaseGrantResp, err = lease.Grant(context.TODO(), 5); err != nil {
fmt.Println(err)
return
}

// 拿到租约的ID
leaseId = leaseGrantResp.ID

// 准备一个用于取消自动续租的context
ctx, cancelFunc := context.WithCancel(context.TODO())

// 确保函数退出后, 自动续租会停止
defer cancelFunc()
defer lease.Revoke(context.TODO(), leaseId)

// 5秒后会取消自动续租
if keepRespChan, err = lease.KeepAlive(ctx, leaseId); err != nil {
fmt.Println(err)
return
}

// 处理续约应答的协程
go func() {
for {
select {
case keepResp = <-keepRespChan:
if keepRespChan == nil {
fmt.Println("租约已经失效了")
goto END
} else { // 每秒会续租一次, 所以就会受到一次应答
fmt.Println("收到自动续租应答:", keepResp.ID)
}
}
}
END:
}()

// if 不存在key, then 设置它, else 抢锁失败
kv = clientv3.NewKV(client)

txn := kv.Txn(context.TODO())

// 定义事务

// 如果key不存在
txn.If(clientv3.Compare(clientv3.CreateRevision("/cron/lock/job9"), "=", 0)).
Then(clientv3.OpPut("/cron/lock/job9", "xxx", clientv3.WithLease(leaseId))).
Else(clientv3.OpGet("/cron/lock/job9")) // 否则抢锁失败

// 提交事务
if txnResp, err = txn.Commit(); err != nil {
fmt.Println(err)
return
}

// 判断是否抢到了锁
if !txnResp.Succeeded {
fmt.Println("锁被占用:", string(txnResp.Responses[0].GetResponseRange().Kvs[0].Value))
return
}

// 2, 处理业务
fmt.Println("处理任务")
time.Sleep(5 * time.Second)

// 3, 释放锁(取消自动续租, 释放租约)
// defer 会把租约释放掉, 关联的KV就被删除了

}

go-crontab 整体架构和设计

实现目标:

实现一个分布式 crontab 系统。用户可以通过前端页面配置任务和 cron 表达式和命令来执行定时任务,相比较 linux 自带的 crontab 来说,本项目可以方便看到执行结果,且分布式部署可以避免单点问题,用户不用登陆到各个机器去配置任务,操作方便。同时用户可以通过页面查看任务执行的情况。当然,目前做的还比较简单,对任务的执行时间没有超时机制,但提供了手动的删除和强杀正在执行的任务操作。

整体架构图

  1. 客户端请求无状态的 Master 集群,将任务保存到 Etcd 中,Master 可以添加、查询任务,查询任务的执行日志
  2. 然后 Etcd 将任务同步到 Worker 集群,所有的 worker 都拿到全部的任务列表
  3. 通过分布式乐观锁互斥的控制多个 worker 争抢一个任务
  4. 然后将任务执行的日志保存在 MongoDB 中

主要分为 master 和 worker 两个角色。通过 etcd 来作为服务发现和分布式锁的实现。MongoDB 作为数据量存储日志信息,方便查询执行结果。同时也可以通过本地 log 日志查看模块的执行情况。
master 通过跟前端交互获取用户的任务操作信息,通过与 etcd 交互和 mongodb 交互来完成建立、删除、编辑、强杀、查看健康 woker 节点以及查看日志等功能。
woker 通过监控 etcd 的节点变化来执行任务的执行、强杀等操作,同时通过 etcd 来实现自身服务的注册功能以及吧执行结果写入 MongoDB 作为日志存储。

  • 利用 etcd 同步全量任务列表到所有的 worker 节点
  • 每个 worker 独立调度全量任务,无需和 Master 产生直接的 RPC,避免网络故障
  • 每个 worker 利用分布式锁抢占,解决并发调度相同任务的问题

Master 功能

  • 任务管理 HTTP 接口:新建、修改、查看、删除任务
  • 任务日志 HTTP 接口:查看任务执行历史日志
  • 任务控制 HTTP 接口:提供强制结束任务的接口
  • 实现 web 管理页面,前后端分离

Etcd 结构

/cron/jobs/任务名 -> {
name, // 任务名
command, // shell命令
cronExpr // cron表达式
}

MongoDB 结构

{
JobName string `bson:"jobName"` // 任务名
Command string `bson:"command"` // shell命令
Err string `bson:"err"` // 脚本错误
Content string `bson:"content"` // 脚本输出
TimePoint TimePoint `bson:"timePoint"` // 执行时间点
}

请求 MongoDB,按任务名查看最近的执行日志

  1. 向 etcd 中写入
/cron/killer/任务名 -> ""
  1. worker 会监听 /cron/killer/ 目录下的 put 修改操作
  2. Master 将要结束的任务名 put 在 /cron/killer/ 目录下,触发 worker 立即结束 shell 任务

Worker 功能

监听 etcd 中 /cron/jobs/ 目录的变化,有变化就说明有添加或者修改任务

基于 cron 表达式计算,触发过期任务

协程池并发执行多任务,基于 etcd 分布式锁抢占

捕获任务执行输出,并保存到 MongoDB

  • 利用 watch API,监听 /cron/jobs//cron/killer/ 目录的变化
  • 将变化事件通过 channel 推送给调度协程,更新内存中的任务信息
  • 监听任务变更 event,更细内存中维护的任务列表
  • 检查 cron 表达式,扫描到期任务,交给执行协程运行
  • 监听任务控制 event,强制中断正在执行的子进程
  • 监听任务执行 result,更新内存中任务状态,投递执行日志
  • 在 etcd 中抢占分布式乐观锁:/cron/lock/任务名
  • 抢占成功则通过 Command 类执行 shell 任务
  • 捕获 Command 输出并等待子进程结束,将执行结果投递给调度协程
  • 监听调度协程发来的执行日志,放入一个 batch 中
  • 对新 batch 启动定时器,超时未满自动提交
  • 若 batch 被放满,那么就立即提交,并取消自动提交定时器

有很多地方有待优化,比如

  • 任务执行时间的限制,可以支持配置任务执行的最大时长,超过强杀。
  • master 目前虽然支持多机部署但是没有主从机制,可以实现 master 的选主机制,防止并发问题。只有主才能执行 etcd 的” 写入操作”
  • 代码结构上有一定的冗余,可以通过复用以实现精简

Lemon-CS

基于 GO 语言实现一个分布式定时任务_Lcy-CSDN 博客


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK