5

在Go中仅使用MySQL实现高性能分布式ID生成器

 2 years ago
source link: https://jasonkayzk.github.io/2021/06/20/%E5%9C%A8Go%E4%B8%AD%E4%BB%85%E4%BD%BF%E7%94%A8MySQL%E5%AE%9E%E7%8E%B0%E9%AB%98%E6%80%A7%E8%83%BD%E5%88%86%E5%B8%83%E5%BC%8FID%E7%94%9F%E6%88%90%E5%99%A8/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在上一篇文章《高性能分布式ID生成器实现方法总结》中,主要介绍了一些常用的分布式ID生成器;本文在上一篇文章的基础之上,采用Leaf算法结合Go实现双Buffer桶的ID生成器!

系列文章:

在Go中仅使用MySQL实现高性能分布式ID生成器

上一篇文章《高性能分布式ID生成器实现方法总结》中,介绍了一些常用的分布式ID生成器;

接下来,在上一篇文章的基础之上,采用Leaf算法结合Go来实现一个双Buffer桶的ID生成器吧!

项目源代码:

项目结构

整个项目的目录结构如下:

$ tree .
.
├── config
|  └── config.go
├── config.json
├── core
|  └── alloc.go
├── go.mod
├── main.go
├── mysql
|  └── mysql.go
├── schema.sql
└── server
   └── server.go

各个目录内容:

  • config:读取config.json相关配置;
  • core:分配分布式ID的核心代码;
  • mysql:分配ID的数据库交互逻辑;
  • server:启动后台生成分布式ID服务;

下面来一个一个看;

数据库设计&配置文件

数据库设计与文章《高性能分布式ID生成器实现方法总结》中所述完全相同:

schema.sql

CREATE DATABASE IF NOT EXISTS `id_alloc_db`;

USE `id_alloc_db`;

CREATE TABLE `segments`
(
    `app_tag`     VARCHAR(32) NOT NULL,
    `max_id`      BIGINT      NOT NULL,
    `step`        BIGINT      NOT NULL,
    `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`app_tag`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8
    COMMENT ='test业务ID池';

INSERT INTO segments(`app_tag`, `max_id`, `step`)
VALUES ('test_business', 0, 100000);

重要字段说明:

  • app_tag:用来区分业务;
  • max_id表示:该app_tag目前所被分配的ID号段的最大值;
  • step表示:每次分配的号段长度;

更多说明见:

同时,在配置文件中展示了数据库连接、服务启动端口以及服务超时等相关配置:

config.json

{
  "DSN": "root:123456@tcp(127.0.0.1:3306)/id_alloc_db",
  "table": "segments",

  "HttpPort": 8880,
  "HttpReadTimeout": 5000,
  "HttpWriteTimeout": 5000
}

项目说明

整个项目主要由四个部分组成:

  • config:读取config.json相关配置;
  • core:分配分布式ID的核心代码;
  • mysql:分配ID的数据库交互逻辑;
  • server:启动后台生成分布式ID服务;

下面我们挨个来看;

Config包

首先从最简单的Config包来看:

config/config.go

package config

var AppConfig *Config

type Config struct {
    DSN   string `json:"DSN"`
    Table string `json:"table"`

    HttpPort         int `json:"httpPort"`
    HttpReadTimeout  int `json:"httpReadTimeout"`
    HttpWriteTimeout int `json:"httpWriteTimeout"`
}

func LoadConf(filename string) error {
    content, err := ioutil.ReadFile(filename)
    if err != nil {
        return err
    }

    conf := Config{}
    err = json.Unmarshal(content, &conf)
    if err != nil {
        return err
    }
    AppConfig = &conf
    return nil
}

config包主要是根据传入的filename解析配置,并将配置保存在Config类型的AppConfig中;

mysql包

mysql包主要完成了向MySQL申请ID的任务,代码如下:

mysql/mysql.go

package mysql

var DbHandler *sql.DB

func InitDB() error {
    db, err := sql.Open("mysql", config.AppConfig.DSN)
    if err != nil {
        return err
    }
    db.SetMaxIdleConns(10)
    db.SetConnMaxLifetime(0)
    DbHandler = db
    return nil
}

func NextId(appTag string) (int, int, error) {
    // 总耗时小于2秒
    ctx, cancelFunc := context.WithTimeout(context.Background(), time.Duration(2000)*time.Millisecond)
    defer cancelFunc()

    // 开启事务
    tx, err := DbHandler.BeginTx(ctx, nil)
    if err != nil {
        return 0, 0, err
    }

    // 1:前进一个步长, 即占用一个号段(更新操作是悲观行锁)
    query := "UPDATE " + config.AppConfig.Table + " SET max_id=max_id+step WHERE app_tag=?"
    stmt, err := tx.PrepareContext(ctx, query)
    if err != nil {
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, err
    }

    result, err := stmt.ExecContext(ctx, appTag)
    if err != nil {
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, err
    }

    rowsAffected, err := result.RowsAffected()
    if err != nil { // 失败
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, err
    } else if rowsAffected == 0 { // 记录不存在
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, fmt.Errorf("app_tag not found")
    }

    // 2:查询更新后的最新max_id, 此时仍在事务内, 行锁保护下
    var maxId, step int
    query = "SELECT max_id, step FROM " + config.AppConfig.Table + " WHERE app_tag=?"
    if stmt, err = tx.PrepareContext(ctx, query); err != nil {
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, err
    }
    if err = stmt.QueryRowContext(ctx, appTag).Scan(&maxId, &step); err != nil {
        if err = tx.Rollback(); err != nil {
            return 0, 0, err
        }
        return 0, 0, err
    }

    // 3, 提交事务
    err = tx.Commit()
    return maxId, step, err
}

主要包括了两个方法:

  • InitDB:初始化数据库;
  • NextId:请求下一个ID端,并更新数据库;

核心部分在NextId方法,它主要就是执行了下面的SQL逻辑:

Begin
UPDATE table SET max_id=max_id+step WHERE app_tag=xxx
SELECT tag, max_id, step FROM table WHERE app_tag=xxx
Commit

即首先,更新了数据库中当前ID段的最大值,随后查询;

core包

core是整个算法核心中的核心部分,代码如下:

alloc.go

package core

// 号段:[left,right)
type Segment struct {
    offset int // 消费偏移
    left   int // 左区间
    right  int // 右区间
}

// 关联到AppTag的号码池
type AppIdAllocator struct {
    mutex        sync.Mutex
    appTag       string      // 业务标识
    segments     []*Segment  // 双Buffer, 最少0个, 最多2个号段在内存
    isAllocating bool        // 是否正在分配中(远程获取)
    waiting      []chan byte // 因号码池空而挂起等待的客户端
}

// 全局分配器, 管理所有App的号码分配
type IdAllocator struct {
    mutex  sync.Mutex
    appMap map[string]*AppIdAllocator
}

var GlobalIdAllocator *IdAllocator

func InitAlloc() (err error) {
    GlobalIdAllocator = &IdAllocator{
        appMap: map[string]*AppIdAllocator{},
    }
    return
}

func (i *IdAllocator) LeftCount(appTag string) int {
    i.mutex.Lock()
    appIdAllocator, _ := i.appMap[appTag]
    i.mutex.Unlock()

    if appIdAllocator != nil {
        return appIdAllocator.leftCountWithMutex()
    }
    return 0
}

func (a *AppIdAllocator) leftCountWithMutex() (count int) {
    a.mutex.Lock()
    defer a.mutex.Unlock()
    return a.leftCount()
}

func (a *AppIdAllocator) leftCount() int {
    var count int
    for i := 0; i < len(a.segments); i++ {
        count += a.segments[i].right - a.segments[i].left - a.segments[i].offset
    }
    return count
}

func (i *IdAllocator) NextId(appTag string) (int, error) {
    var (
        appIdAlloc *AppIdAllocator
        exist      bool
    )

    i.mutex.Lock()
    if appIdAlloc, exist = i.appMap[appTag]; !exist {
        appIdAlloc = &AppIdAllocator{
            appTag:       appTag,
            segments:     make([]*Segment, 0),
            isAllocating: false,
            waiting:      make([]chan byte, 0),
        }
        i.appMap[appTag] = appIdAlloc
    }
    i.mutex.Unlock()

    return appIdAlloc.nextId()
}

func (a *AppIdAllocator) nextId() (int, error) {
    var (
        nextId    int
        waitChan  chan byte
        waitTimer *time.Timer
        hasId     = false
    )

    a.mutex.Lock()
    defer a.mutex.Unlock()

    // 1:有剩余号码, 立即分配
    if a.leftCount() != 0 {
        nextId = a.popNextId()
        hasId = true
    }

    // 2:段<=1个, 启动补偿线程
    if len(a.segments) <= 1 && !a.isAllocating {
        a.isAllocating = true
        go a.fillSegments()
    }

    if hasId {
        return nextId, nil
    }

    // 3:没有剩余号码, 此时补偿线程一定正在运行, 等待其至多一段时间
    waitChan = make(chan byte, 1)
    a.waiting = append(a.waiting, waitChan) // 排队等待唤醒

    // 释放锁, 等待补偿线程唤醒
    a.mutex.Unlock()

    waitTimer = time.NewTimer(2 * time.Second) // 最多等待2秒
    select {
    case <-waitChan:
    case <-waitTimer.C:
    }

    // 4:再次上锁尝试获取号码
    a.mutex.Lock()
    if a.leftCount() != 0 {
        nextId = a.popNextId()
        return nextId, nil
    } else {
        return 0, fmt.Errorf("no available id")
    }
}

func (a *AppIdAllocator) popNextId() int {
    var nextId = a.segments[0].left + a.segments[0].offset
    a.segments[0].offset++
    if nextId+1 >= a.segments[0].right {
        a.segments = append(a.segments[:0], a.segments[1:]...) // 弹出第一个seg, 后续seg向前移动
    }
    return nextId
}

// 分配号码段, 直到足够2个segment, 否则始终不会退出
func (a *AppIdAllocator) fillSegments() {
    var failTimes = 0
    for {
        a.mutex.Lock()
        if len(a.segments) <= 1 { // 只剩余<=1段, 那么继续获取新号段
            a.mutex.Unlock()
            // 请求mysql获取号段
            if seg, err := a.newSegment(); err != nil {
                failTimes++
                if failTimes > 3 { // 连续失败超过3次则停止分配
                    a.mutex.Lock()
                    a.wakeup() // 唤醒等待者, 让它们立马失败
                    goto LEAVE
                }
            } else {
                failTimes = 0 // 分配成功则失败次数重置为0
                // 新号段补充进去
                a.mutex.Lock()
                a.segments = append(a.segments, seg)
                a.wakeup()               // 尝试唤醒等待资源的调用
                if len(a.segments) > 1 { // 已生成2个号段, 停止继续分配
                    goto LEAVE
                } else {
                    a.mutex.Unlock()
                }
            }
        } else {
            // never reach
            break
        }
    }

LEAVE:
    a.isAllocating = false
    a.mutex.Unlock()
}

func (a *AppIdAllocator) newSegment() (*Segment, error) {
    maxId, step, err := mysql.NextId(a.appTag)
    if err != nil {
        return nil, err
    }

    return &Segment{
        left:  maxId - step,
        right: maxId,
    }, nil
}

func (a *AppIdAllocator) wakeup() {
    var waitChan chan byte

    for _, waitChan = range a.waiting {
        close(waitChan)
    }
    a.waiting = a.waiting[:0]
}

alloc包比较复杂,我们一点一点来看;

几个分配器类型

代码首先声明了几个类型:

  • Segment:具体的双Buffer中单个ID池的抽象,ID区间为:[left,right)
  • AppIdAllocator:关联到AppTag的ID Buffer池,即具体某个业务的ID分配器;
  • IdAllocator:全局ID分配器, 管理所有业务的ID分配;

函数InitAlloc用于在main中初始化全局变量GlobalIdAllocator,即全局的ID分配器;

查询当前剩余ID:LeftCount

IdAllocator中的LeftCount方法用于查询当前某个appTag下ID Buffer池中的剩余ID数;

具体方式就是首先获取IdAllocatorappMap存储的对应appTag下的AppIdAllocator,然后调用AppIdAllocatorleftCountWithMutex方法;

leftCountWithMutex方法最终调用leftCount方法进行计算;

计算的方式和前一篇文章所述基本一致,只是需要计算两个Buffer的和,即:

单个segment:
(right - left) - offset = 区间长度 - 已使用区间长度

总计:
count = sum(<all_segment_remain>)
计算下一个ID:popNextId

当ID充足时,我们通过popNextId方法计算下一个ID;

下面是popNextId的代码,非常简单:

func (a *AppIdAllocator) popNextId() int {
    var nextId = a.segments[0].left + a.segments[0].offset
    a.segments[0].offset++
    if nextId+1 >= a.segments[0].right {
        // 弹出第一个seg, 后续seg向前移动
        a.segments = append(a.segments[:0], a.segments[1:]...)
    }
    return nextId
}

首先,下一个分配的ID一定是:Buffer[0]min + offset

分配ID后,当前Buffer[0]中的offset值加1,指向下一个ID;

当然,如果下一个值已经到了Buffer[0]的末尾,即当前Buffer中的ID已经全部分配,则将Buffer[1]中的值移入Buffer[0]中,进行填充;

分配ID:NextId

IdAllocator中的NextId方法用于分配当前某个appTag下ID Buffer池中的ID;

需要注意的是:整个ID分配的过程是惰性的,即懒加载

在分配时,如果发现IdAllocatorappMap尚无存储这个appTag,则会首先创建,然后再对这个appTag进行ID生成;

分配主要使用对应appTag下的AppIdAllocatornextId方法:

下面具体来看nextId方法:

func (a *AppIdAllocator) nextId() (int, error) {
    ……
    a.mutex.Lock()
    defer a.mutex.Unlock()

    // 1:有剩余号码, 立即分配
    if a.leftCount() != 0 {
        nextId = a.popNextId()
        hasId = true
    }

    // 2:段<=1个, 启动补偿线程
    if len(a.segments) <= 1 && !a.isAllocating {
        a.isAllocating = true
        go a.fillSegments()
    }

    if hasId {
        return nextId, nil
    }

    // 3:没有剩余号码, 此时补偿线程一定正在运行, 等待其至多一段时间
    waitChan = make(chan byte, 1)
    a.waiting = append(a.waiting, waitChan) // 排队等待唤醒

    // 释放锁, 等待补偿线程唤醒
    a.mutex.Unlock()

    waitTimer = time.NewTimer(2 * time.Second) // 最多等待2秒
    select {
    case <-waitChan:
    case <-waitTimer.C:
    }

    // 4:再次上锁尝试获取号码
    a.mutex.Lock()
    if a.leftCount() != 0 {
        nextId = a.popNextId()
        return nextId, nil
    } else {
        return 0, fmt.Errorf("no available id")
    }
    ……
}

上面是nextId方法的内容,在生成ID时主要分为了几种情况:

  • Buffer中仍然存在ID(这也是最多的情况):直接返回ID;
  • Buffer中不存在ID:需要等待分配;

通过leftCount方法可以查看是否存在剩余ID缓存;

下面分类来讨论;

① 有剩余号码

当有剩余号码时,会立即通过popNextId方法分配ID;

随后,如果分配后剩余的Buffer桶不足,则会异步对Buffer桶进行填充;


② 无剩余号码(通常是在初始化时)

当ID Buffer中无剩余号码时,此时由于经过了上面的异步填充代码,因此,此时一定处于补偿协程在工作的状态(也有可能已经补偿完毕);

此时,将当前协程挂起等待,直到被fillSegments方法调用的wakeup方法唤醒或2秒钟后超时:

// 3:没有剩余号码, 此时补偿线程一定正在运行, 等待其至多一段时间
waitChan = make(chan byte, 1)
a.waiting = append(a.waiting, waitChan) // 排队等待唤醒

// 释放锁, 等待补偿线程唤醒
a.mutex.Unlock()

waitTimer = time.NewTimer(2 * time.Second) // 最多等待2秒
select {
    case <-waitChan:
    case <-waitTimer.C:
}

等待结束后,通常情况下是因为fillSegments方法补偿完毕,此时再次尝试获取ID;

异步双Buffer池ID填充:fillSegments

fillSegments方法是在NextId方法分配ID后发现双Buffer中的某个Segment已经用尽或在初始化时才调用的;

fillSegments方法也比较简单,主要是通过mysql包请求MySQL分配ID段,然后向具体的某个AppIdAllocator添加对应的Segment即可;

server包

server包就比较简单了,主要是创建了两个路由:

  • /alloc?app_tag=<app_name>:请求一个ID;
  • /health?app_tag=<app_name>:ID生成器健康查询;

下面是代码:

server.go

package server

type allocResponse struct {
    RespCode int    `json:"resp_code"`
    Msg      string `json:"msg"`
    Id       int    `json:"id"`
}

type healthResponse struct {
    RespCode int    `json:"resp_code"`
    Msg      string `json:"msg"`
    Left     int    `json:"left"`
}

func handleHealth(w http.ResponseWriter, r *http.Request) {
    var (
        appTag string
    )
    healthResp := healthResponse{}
    err := r.ParseForm()
    if err != nil {
        goto RESP
    }
    if appTag = r.Form.Get("app_tag"); appTag == "" {
        err = fmt.Errorf("need app_tag param")
        goto RESP
    }
    healthResp.Left = core.GlobalIdAllocator.LeftCount(appTag)
    if healthResp.Left == 0 {
        err = fmt.Errorf("no available id")
        goto RESP
    }

RESP:
    if err != nil {
        healthResp.RespCode = -1
        healthResp.Msg = fmt.Sprintf("%v", err)
        w.WriteHeader(500)
    } else {
        healthResp.Msg = "success"
    }
    if bytes, err := json.Marshal(&healthResp); err == nil {
        _, _ = w.Write(bytes)
    } else {
        w.WriteHeader(500)
        healthResp.Msg = fmt.Sprintf("%v", err)
    }
}

func handleAlloc(w http.ResponseWriter, r *http.Request) {
    var (
        resp   = allocResponse{}
        err    error
        bytes  []byte
        appTag string
    )

    if err = r.ParseForm(); err != nil {
        goto RESP
    }
    if appTag = r.Form.Get("app_tag"); appTag == "" {
        err = fmt.Errorf("need app_tag param")
        goto RESP
    }

    for {
        if resp.Id, err = core.GlobalIdAllocator.NextId(appTag); err != nil {
            goto RESP
        }
        if resp.Id != 0 { // 跳过ID=0, 一般业务不支持ID=0
            break
        }
    }
RESP:
    if err != nil {
        resp.RespCode = -1
        resp.Msg = fmt.Sprintf("%v", err)
        w.WriteHeader(500)
    } else {
        resp.Msg = "success"
    }
    if bytes, err = json.Marshal(&resp); err == nil {
        _, _ = w.Write(bytes)
    } else {
        w.WriteHeader(500)
        resp.Msg = fmt.Sprintf("%v", err)
    }
}

func StartServer() error {
    mux := http.NewServeMux()
    mux.HandleFunc("/alloc", handleAlloc)
    mux.HandleFunc("/health", handleHealth)

    srv := &http.Server{
        ReadTimeout:  time.Duration(config.AppConfig.HttpReadTimeout) * time.Millisecond,
        WriteTimeout: time.Duration(config.AppConfig.HttpWriteTimeout) * time.Millisecond,
        Handler:      mux,
    }
    listener, err := net.Listen("tcp", ":"+strconv.Itoa(config.AppConfig.HttpPort))
    if err != nil {
        return err
    }

    fmt.Printf("server started at: localhoost:%d\n", config.AppConfig.HttpPort)
    return srv.Serve(listener)
}

由于代码非常简单,这里就不再解释了;

main函数

main函数可以解析config参数,用于指定具体配置文件的路径,如:

$ ./distributed-id-generator --config ./config.json

当然,默认就是当前路径下的config.json文件了;

下面是main函数代码:

main.go

package main

func main() {
    var configFile string
    flag.StringVar(&configFile, "config", "config.json", "the config file: config.json")
    flag.Parse()

    err := config.LoadConf(configFile)
    if err != nil {
        panic(err)
    }
    err = mysql.InitDB()
    if err != nil {
        panic(err)
    }
    err = core.InitAlloc()
    if err != nil {
        panic(err)
    }
    err = server.StartServer()
    if err != nil {
        panic(err)
    }
}

main函数主要完成了这么几项内容:

  • 从命令行参数中解析配置文件路径;
  • 解析配置文件并加载配置;
  • 初始化数据库;
  • 初始化ID生成器;
  • 初始化ID生成服务;

至此,整个项目完成,下面进行测试;

测试

启动项目

① 初始化数据库

创建数据库:

schema.sql

CREATE DATABASE IF NOT EXISTS `id_alloc_db`;

USE `id_alloc_db`;

CREATE TABLE `segments`
(
    `app_tag`     VARCHAR(32) NOT NULL,
    `max_id`      BIGINT      NOT NULL,
    `step`        BIGINT      NOT NULL,
    `update_time` DATETIME DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
    PRIMARY KEY (`app_tag`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8
    COMMENT ='业务ID池';

INSERT INTO segments(`app_tag`, `max_id`, `step`)
VALUES ('test_business', 0, 100000);

② 修改配置

config.json

{
  "DSN": "root:123456@tcp(127.0.0.1:3306)/id_alloc_db",
  "table": "segments",
  "HttpPort": 8880,
  "HttpReadTimeout": 5000,
  "HttpWriteTimeout": 5000
}

修改DSN为你实际数据库的配置;

③ 安装依赖并运行

执行下面的命令安装依赖并启动服务:

go mod tidy && go run main.go

打印出Server启动信息则成功:

$ go mod tidy && go run main.go
server started at: localhoost:8880

功能测试

① 请求分配ID

请求分配ID路由为,/alloc?app_tag=<app_name>

下面为结果:

$ curl http://localhost:8880/alloc?app_tag=test_business
{"resp_code":0,"msg":"success","id":1}

$ curl http://localhost:8880/alloc?app_tag=test_business
{"resp_code":0,"msg":"success","id":2}
② 健康检查

请求分配ID路由为,/health?app_tag=<app_name>

下面为结果:

$ curl http://localhost:8880/health?app_tag=test_business
{"resp_code":0,"msg":"success","left":199996}

$ curl http://localhost:8880/health?app_tag=test_business
{"resp_code":0,"msg":"success","left":199996}

此时数据库中的内容:

mysql> select * from id_alloc_db.segments;

+---------------+--------+--------+---------------------+
| app_tag       | max_id | step   | update_time         |
+---------------+--------+--------+---------------------+
| test_business | 200000 | 100000 | 2021-06-20 13:07:23 |
+---------------+--------+--------+---------------------+
1 row in set (0.00 sec)

此时ID已经缓存至了200000!

附录

系列文章:

文章参考:


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK