2

手撸golang GO与微服务 Saga模式之8 集成测试

 3 years ago
source link: https://segmentfault.com/a/1190000039668379
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

手撸golang GO与微服务 Saga模式之8 集成测试

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

Saga模式

  • saga模式将分布式长事务切分为一系列独立短事务
  • 每个短事务是可通过补偿动作进行撤销的
  • 事务动作和补动作偿都是幂等的, 允许重复执行而不会有副作用
Saga由一系列的子事务“Ti”组成,
每个Ti都有对应的补偿“Ci”,
当Ti出现问题时Ci用于处理Ti执行带来的问题。

可以通过下面的两个公式理解Saga模式。
T = T1 T2 … Tn
T = TCT

Saga模式的核心理念是避免使用长期持有锁(如14.2.2节介绍的两阶段提交)的长事务,
而应该将事务切分为一组按序依次提交的短事务,
Saga模式满足ACD(原子性、一致性、持久性)特征。

摘自 <<Go微服务实战>> 刘金亮, 2021.1
  • 为实现saga模式的分布式事务, 先撸一个pub/sub事务消息队列服务
  • 事务消息队列服务的功能性要求

    • 消息不会丢失: 消息的持久化
    • 消息的唯一性: 要求每个消息有全局ID和子事务ID
    • 确保投递成功: 投递队列持久化, 投递状态持久化, 失败重试

子目标(Day 8)

  • 创建虚拟的库存服务

    • 启动时, 注册到MQ
    • 接收到订单创建的消息时, 扣减库存
    • 扣库成功时, 经MQ通知订单服务扣库成功
    • 扣库失败时, 经MQ通知订单服务扣库失败
  • IStockService: 模拟的库存服务接口
  • tStockService: 虚拟库存服务, 实现IStockService接口
  • NotifySaleOrderCreated: 用于监听订单创建消息的http回调处理器

order_test.go

  1. 初始化10个产品库存
  2. 订单服务, 创建订单1, 尝试扣减1个库存, 预期成功
  3. 订单服务, 创建订单2, 尝试扣减10个库存, 预期失败
  4. 校验订单1的最终状态为出库成功
  5. 校验订单2的最终状态为出库失败
package saga

import (
    "github.com/jmoiron/sqlx"
    "learning/gooop/saga/mqs/cmd"
    "learning/gooop/saga/mqs/database"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/order"
    "learning/gooop/saga/stock"
    "sync"
    "testing"
    "time"
)

var gRunOnce sync.Once
func fnBootMQS() {
    gRunOnce.Do(func() {
        // boot mqs
        go cmd.BootMQS()

        // wait for mqs up
        time.Sleep(1 * time.Second)
    })
}

func fnAssertTrue (t *testing.T, b bool, msg string) {
    if !b {
        t.Fatal(msg)
    }
}

func Test_SagaSaleOrder(t *testing.T) {
    // prepare mqs
    fnClearDB(t)
    fnBootMQS()

    // 1 create prod stock
    prodID := "test-prod-1"
    err := stock.MockStockService.AddStock(prodID, 10)
    if err != nil {
        t.Fatal(err)
    }

    // create order 1
    o1 := &order.SaleOrder{
        OrderID: "test-order-1",
        ProductID: prodID,
        CustomerID: "test-customer-1",
        Quantity: 1,
        Price: 100,
        Amount: 100,
        CreateTime: time.Now().UnixNano(),
        StatusFlag: order.StatusNotDelivered,
    }
    err = order.MockSaleOrderService.Create(o1)
    if err != nil {
        t.Fatal(err)
    }

    // create order 2
    time.Sleep(10*time.Millisecond)
    o2 := &order.SaleOrder{
        OrderID: "test-order-2",
        ProductID: prodID,
        CustomerID: "test-customer-2",
        Quantity: 10,
        Price: 100,
        Amount: 1000,
        CreateTime: time.Now().UnixNano(),
        StatusFlag: order.StatusNotDelivered,
    }
    err = order.MockSaleOrderService.Create(o2)
    if err != nil {
        t.Fatal(err)
    }

    time.Sleep(1 * time.Second)
    logger.Logf("============================================")
    log := "tSaleOrderService.beginSubscribeMQ, done"
    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)

    log = "tSaleOrderService.publishMQ, done, order=test-order-1"
    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)

    log = "tSaleOrderService.publishMQ, done, order=test-order-2"
    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)

    log = "stock.NotifySaleOrderCreated, order=test-order-1"
    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)

    log = "stock.NotifySaleOrderCreated, order=test-order-2"
    fnAssertTrue(t, logger.Count(log)==1, "expecting log: " + log)

    o1 = order.MockSaleOrderService.Get(o1.OrderID)
    fnAssertTrue(t, o1.StatusFlag == order.StatusStockOutboundDone, "expecting o1 done")

    o2 = order.MockSaleOrderService.Get(o2.OrderID)
    fnAssertTrue(t, o2.StatusFlag == order.StatusStockOutboundFailed, "expecting o2 failed")

    logger.Logf("test passed")
}


func fnClearDB(t *testing.T) {
    fnDBExec(t, "delete from subscriber")
    fnDBExec(t, "delete from tx_msg")
    fnDBExec(t, "delete from delivery_queue")
    fnDBExec(t, "delete from success_queue")
}


func fnDBExec(t *testing.T, sql string, args... interface{}) int {
    rows := []int64{ 0 }
    err := database.DB(func(db *sqlx.DB) error {
        r,e := db.Exec(sql, args...)
        if e != nil {
            return e
        }

        rows[0], e = r.RowsAffected()
        if e != nil {
            return e
        }

        return nil
    })

    if err != nil {
        t.Fatal(err)
    }
    return int(rows[0])
}
$ go test -v order_test.go 
=== RUN   Test_SagaSaleOrder
23:55:54.292132442 eventbus.Pub, event=system.boot, handler=gDeliveryService.handleBootEvent
[GIN-debug] [WARNING] Creating an Engine instance with the Logger and Recovery middleware already attached.

[GIN-debug] [WARNING] Running in "debug" mode. Switch to "release" mode in production.
 - using env:   export GIN_MODE=release
 - using code:  gin.SetMode(gin.ReleaseMode)

[GIN-debug] GET    /ping                     --> learning/gooop/saga/mqs/handlers.Ping (4 handlers)
[GIN-debug] POST   /subscribe                --> learning/gooop/saga/mqs/handlers.Subscribe (4 handlers)
[GIN-debug] POST   /publish                  --> learning/gooop/saga/mqs/handlers.Publish (4 handlers)
[GIN-debug] POST   /notify                   --> learning/gooop/saga/mqs/handlers.Notify (4 handlers)
[GIN-debug] POST   /notify/sale-order.stock.outbound --> learning/gooop/saga/order.NotifyStockOutbound (4 handlers)
[GIN-debug] POST   /notify/sale-order.created --> learning/gooop/saga/stock.NotifySaleOrderCreated (4 handlers)
[GIN-debug] Listening and serving HTTP on :3333
23:55:54.292287032 tDeliveryService.beginCleanExpiredWorkers
23:55:54.292345845 tDeliveryService.beginCreatingWorkers
23:55:54.356542981 handlers.Subscribe, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
23:55:54.356524325 handlers.Subscribe, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
23:55:54.365256441 handlers.Subscribe, event=subscriber.registered, msg=&{sale-order-service sale-order.stock.outbound http://localhost:3333/notify/sale-order.stock.outbound 1616086554355593476}
23:55:54.365271105 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021/03/18 - 23:55:54 | 200 |    8.865173ms |             ::1 | POST     "/subscribe"
[GIN] 2021/03/18 - 23:55:54 | 200 |    8.882138ms |             ::1 | POST     "/subscribe"
23:55:54.365488163 tSaleOrderService.beginSubscribeMQ, done
23:55:54.365861542 database.DB, err=empty rows
23:55:54.366239244 tDeliveryWorker.afterInitialLoad, clientID=sale-order-service, rows=0
23:55:54.373588493 handlers.Subscribe, event=subscriber.registered, msg=&{stock-service sale-order.created http://localhost:3333/notify/sale-order.created 1616086554355598830}
23:55:54.373605972 eventbus.Pub, event=subscriber.registered, handler=gDeliveryService.handleSubscriberRegistered
[GIN] 2021/03/18 - 23:55:54 | 200 |   17.189632ms |             ::1 | POST     "/subscribe"
[GIN] 2021/03/18 - 23:55:54 | 200 |   17.205549ms |             ::1 | POST     "/subscribe"
23:55:54.373843032 tStockService.beginSubscribeMQ, done
23:55:54.3743926 database.DB, err=empty rows
23:55:54.374499757 tDeliveryWorker.afterInitialLoad, clientID=stock-service, rows=0
23:55:55.292336699 tStockService.AddStock, done, prodId=test-prod-1, stock=0, delta=0, after=10
23:55:55.323746568 handlers.Publish, msg=test-order-1/test-order-1/sale-order.created, msgId=112
[GIN] 2021/03/18 - 23:55:55 | 200 |   31.112478ms |             ::1 | POST     "/publish"
[GIN] 2021/03/18 - 23:55:55 | 200 |   31.125855ms |             ::1 | POST     "/publish"
23:55:55.323811205 handlers.Publish, pubLiveMsg 112
23:55:55.323910377 tSaleOrderService.publishMQ, done, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
23:55:55.324227736 handlers.Publish, pubLiveMsg, msgId=112, rows=1
23:55:55.324273573 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-1/test-order-1/http://localhost:3333/notify/sale-order.created
23:55:55.32428051 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.324285512 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.324292286 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-1/test-order-1/sale-order.created
23:55:55.324346678 tDeliveryWorker.beginPollAndDeliver, msg from live=&{98 stock-service http://localhost:3333/notify/sale-order.created 112 test-order-1 test-order-1 sale-order-service 1616082955292352151 sale-order.created {"OrderID":"test-order-1","CustomerID":"test-customer-1","ProductID":"test-prod-1","Quantity":1,"Price":100,"Amount":100,"CreateTime":1616082955292352151,"StatusFlag":0} 0 0}
23:55:55.33925766 handlers.Publish, msg=test-order-2/test-order-2/sale-order.created, msgId=113
[GIN] 2021/03/18 - 23:55:55 | 200 |   15.264561ms |             ::1 | POST     "/publish"
[GIN] 2021/03/18 - 23:55:55 | 200 |   15.280884ms |             ::1 | POST     "/publish"
23:55:55.339353768 handlers.Publish, pubLiveMsg 113
23:55:55.339446893 tSaleOrderService.publishMQ, done, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
23:55:55.339909493 handlers.Publish, pubLiveMsg, msgId=113, rows=1
23:55:55.339919874 handlers.Publish, event=msg.published, clientID=stock-service, msg=test-order-2/test-order-2/http://localhost:3333/notify/sale-order.created
23:55:55.339925049 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.339929964 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.339935935 tLiveMsgSource.handleMsgPublished, clientID=stock-service, msg=test-order-2/test-order-2/sale-order.created
23:55:55.350117186 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-1/test-order-1
23:55:55.35041833 stock.NotifySaleOrderCreated, order=test-order-1/&{test-order-1 test-customer-1 test-prod-1 1 100 100 1616082955292352151 0}
23:55:55.350429178 tStockService.AddStock, done, prodId=test-prod-1, stock=10, delta=-1, after=9
[GIN] 2021/03/18 - 23:55:55 | 200 |      88.872µs |             ::1 | POST     "/notify/sale-order.created"
[GIN] 2021/03/18 - 23:55:55 | 200 |     133.617µs |             ::1 | POST     "/notify/sale-order.created"
23:55:55.350592351 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-1/test-order-1
23:55:55.367336707 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-1/test-order-1
23:55:55.36738322 tDeliveryWorker.beginPollAndDeliver, msg from live=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 0 0}
23:55:55.367530495 database.DB, err=empty rows
23:55:55.374978535 tDeliveryWorker.deliver, begin, id=stock-service, msg=test-order-2/test-order-2
23:55:55.375201115 stock.NotifySaleOrderCreated, order=test-order-2/&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
23:55:55.375211216 tStockService.AddStock, failed, prodId=test-prod-1, stock=9, delta=-10
23:55:55.375219558 tStockService.HandleSaleOrderCreated, err=insufficient stock, order=&{test-order-2 test-customer-2 test-prod-1 10 100 1000 1616082955302734821 0}
[GIN] 2021/03/18 - 23:55:55 | 200 |      102.52µs |             ::1 | POST     "/notify/sale-order.created"
[GIN] 2021/03/18 - 23:55:55 | 200 |     116.933µs |             ::1 | POST     "/notify/sale-order.created"
23:55:55.375354895 tDeliveryWorker.deliver, OK, id=stock-service, msg=test-order-2/test-order-2
23:55:55.389901711 tDeliveryWorker.afterDeliverySuccess, done, id=stock-service, msg=test-order-2/test-order-2
23:55:55.38993077 tDeliveryWorker.beginPollAndDeliver, msg from db=&{99 stock-service http://localhost:3333/notify/sale-order.created 113 test-order-2 test-order-2 sale-order-service 1616082955302734821 sale-order.created {"OrderID":"test-order-2","CustomerID":"test-customer-2","ProductID":"test-prod-1","Quantity":10,"Price":100,"Amount":1000,"CreateTime":1616082955302734821,"StatusFlag":0} 1 1616082955367401386}
23:55:55.420121681 handlers.Publish, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound, msgId=114
[GIN] 2021/03/18 - 23:55:55 | 200 |   69.507171ms |             ::1 | POST     "/publish"
[GIN] 2021/03/18 - 23:55:55 | 200 |   69.520805ms |             ::1 | POST     "/publish"
23:55:55.420220719 handlers.Publish, pubLiveMsg 114
23:55:55.420321792 tStockService.publishMQ, done, msg=&{test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1}
23:55:55.42071623 handlers.Publish, pubLiveMsg, msgId=114, rows=1
23:55:55.420731889 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/http://localhost:3333/notify/sale-order.stock.outbound
23:55:55.420741935 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.420746401 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.420755367 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-1/test-order-1.outbound/sale-order.stock.outbound
23:55:55.42079505 tDeliveryWorker.beginPollAndDeliver, msg from live=&{100 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 114 test-order-1 test-order-1.outbound stock-service 1616082955350432496 sale-order.stock.outbound 1 0 0}
23:55:55.435844021 handlers.Publish, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound, msgId=115
[GIN] 2021/03/18 - 23:55:55 | 200 |   15.407267ms |             ::1 | POST     "/publish"
[GIN] 2021/03/18 - 23:55:55 | 200 |   15.420327ms |             ::1 | POST     "/publish"
23:55:55.4359058 handlers.Publish, pubLiveMsg 115
23:55:55.436026025 tStockService.publishMQ, done, msg=&{test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0}
23:55:55.436398324 handlers.Publish, pubLiveMsg, msgId=115, rows=1
23:55:55.436409937 handlers.Publish, event=msg.published, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/http://localhost:3333/notify/sale-order.stock.outbound
23:55:55.43642793 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.sale-order-service
23:55:55.436433697 eventbus.Pub, event=msg.published, handler=tLiveMsgSource.stock-service
23:55:55.43644379 tLiveMsgSource.handleMsgPublished, clientID=sale-order-service, msg=test-order-2/test-order-2.outbound/sale-order.stock.outbound
23:55:55.446599314 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.446809726 order.NotifyStockOutbound, orderID=test-order-1, succeeded=true
[GIN] 2021/03/18 - 23:55:55 | 200 |      61.898µs |             ::1 | POST     "/notify/sale-order.stock.outbound"
[GIN] 2021/03/18 - 23:55:55 | 200 |      81.911µs |             ::1 | POST     "/notify/sale-order.stock.outbound"
23:55:55.446951354 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.462584405 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-1/test-order-1.outbound
23:55:55.462615131 tDeliveryWorker.beginPollAndDeliver, msg from live=&{101 sale-order-service http://localhost:3333/notify/sale-order.stock.outbound 115 test-order-2 test-order-2.outbound stock-service 1616082955375214295 sale-order.stock.outbound 0 0 0}
23:55:55.469999185 tDeliveryWorker.deliver, begin, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:55.470163043 order.NotifyStockOutbound, orderID=test-order-2, succeeded=false
[GIN] 2021/03/18 - 23:55:55 | 200 |       85.14µs |             ::1 | POST     "/notify/sale-order.stock.outbound"
[GIN] 2021/03/18 - 23:55:55 | 200 |     105.638µs |             ::1 | POST     "/notify/sale-order.stock.outbound"
23:55:55.470369408 tDeliveryWorker.deliver, OK, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:55.486229145 tDeliveryWorker.afterDeliverySuccess, done, id=sale-order-service, msg=test-order-2/test-order-2.outbound
23:55:56.302885199 ============================================
23:55:56.303470422 test passed
--- PASS: Test_SagaSaleOrder (2.05s)
PASS
ok      command-line-arguments  2.057s

IStockService.go

模拟的库存服务接口

package stock;

import "learning/gooop/saga/order"

type IStockService interface {
    GetStock(prodId string) int
    AddStock(prodId string, delta int) error
    HandleSaleOrderCreated(it *order.SaleOrder) error
}

tStockService.go

虚拟库存服务, 实现IStockService接口

package stock

import (
    "bytes"
    "encoding/json"
    "errors"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "learning/gooop/saga/order"
    "net/http"
    "sync"
    "time"
)

type tStockService struct {
    rwmutex *sync.RWMutex
    stock map[string]int
    bMQReady bool
    publishQueue chan *models.TxMsg
}

func newStockService() IStockService {
    it := new(tStockService)
    it.init()
    return it
}


func (me *tStockService) init() {
    me.rwmutex = new(sync.RWMutex)
    me.stock = make(map[string]int)
    me.bMQReady = false
    me.publishQueue = make(chan *models.TxMsg, gMQMaxQueuedMsg)

    go func() {
        time.Sleep(100*time.Millisecond)
        go me.beginSubscribeMQ()
        go me.beginPublishMQ()
    }()
}

func (me *tStockService) GetStock(prodId string) int {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it,ok := me.stock[prodId]
    if ok {
        return it
    } else {
        return 0
    }
}

func (me *tStockService) AddStock(prodId string, delta int) error {
    me.rwmutex.RLock()
    defer me.rwmutex.RUnlock()

    it,ok := me.stock[prodId]
    if ok {
        n := it + delta
        if n < 0 {
            logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=%d, delta=%d", prodId, it, delta)
            return gInsufficientStockError
        } else {
            logger.Logf("tStockService.AddStock, done, prodId=%s, stock=%d, delta=%d, after=%d", prodId, it, delta, n)
            me.stock[prodId] = n
        }
    } else {
        if delta < 0 {
            logger.Logf("tStockService.AddStock, failed, prodId=%s, stock=0, delta=%d", prodId, delta)
            return gInsufficientStockError
        } else {
            logger.Logf("tStockService.AddStock, done, prodId=%s, stock=0, delta=%d, after=%d", prodId, it, delta)
            me.stock[prodId] = delta
        }
    }

    return nil
}


func (me *tStockService) beginSubscribeMQ() {
    expireDuration := int64(1 * time.Hour)
    subscribeDuration := 20 * time.Minute
    pauseDuration := 3*time.Second
    lastSubscribeTime := int64(0)

    for {
        now := time.Now().UnixNano()
        if now - lastSubscribeTime >= int64(subscribeDuration) {
            expireTime := now + expireDuration
            err := fnSubscribeMQ(expireTime)

            if err != nil {
                me.bMQReady = false
                logger.Logf("tStockService.beginSubscribeMQ, failed, err=%v", err)

            } else {
                lastSubscribeTime = now
                me.bMQReady = true
                logger.Logf("tStockService.beginSubscribeMQ, done")
            }
        }
        time.Sleep(pauseDuration)
    }
}

func fnSubscribeMQ(expireTime int64) error {
    msg := &models.SubscribeMsg{
        ClientID: gMQClientID,
        Topic: gMQSubscribeTopic,
        NotifyUrl: gMQServerURL + PathOfNotifySaleOrderCreated,
        ExpireTime: expireTime,
    }
    url := gMQServerURL + "/subscribe"
    return fnPost(msg, url)
}


func fnPost(msg interface{}, url string) error {
    body,_ := json.Marshal(msg)
    rsp, err := http.Post(url, "application/json;charset=utf-8", bytes.NewReader(body))
    if err != nil {
        return err
    }

    defer rsp.Body.Close()
    j, err := ioutil.ReadAll(rsp.Body)
    if err != nil {
        return err
    }
    ok := &models.OkMsg{}
    err = json.Unmarshal(j, ok)
    if err != nil {
        return err
    }

    if !ok.OK {
        return gMQReplyFalse
    }

    return nil
}



func (me *tStockService) beginPublishMQ() {
    for {
        select {
        case msg := <- me.publishQueue :
            me.publishMQ(msg)
            break
        }
    }
}

func (me *tStockService) publishMQ(msg *models.TxMsg) {
    url := gMQServerURL + "/publish"
    for i := 0;i < gMQMaxPublishRetry;i++ {
        err := fnPost(msg, url)
        if err != nil {
            logger.Logf("tStockService.publishMQ, failed, err=%v, msg=%v", err, msg)
            time.Sleep(gMQPublishInterval)

        } else {
            logger.Logf("tStockService.publishMQ, done, msg=%v", msg)
            return
        }
    }

    // publish failed
    logger.Logf("tStockService.publishMQ, failed max retries, msg=%v", msg)
}


func (me *tStockService) HandleSaleOrderCreated(it *order.SaleOrder) error {
    msg := &models.TxMsg{}
    msg.GlobalID = it.OrderID
    msg.SubID = it.OrderID + ".outbound"
    msg.SenderID = gMQClientID
    msg.Topic = gMQPublishTopic

    err := me.AddStock(it.ProductID, -it.Quantity)
    msg.CreateTime = time.Now().UnixNano()

    if err != nil {
        logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", err.Error(), it)
        msg.Content = "0"
    } else {
        msg.Content = "1"
    }

    if len(me.publishQueue) >= gMQMaxQueuedMsg {
        logger.Logf("tStockService.HandleSaleOrderCreated, err=%s, order=%v", gMQBlocked.Error(), it)
        return gMQBlocked

    } else {
        me.publishQueue <- msg
        return err
    }
}


var gInsufficientStockError = errors.New("insufficient stock")

var gMQReplyFalse = errors.New("mq reply false")
var gMQBlocked = errors.New("mq blocked")
var gMQMaxPublishRetry = 10
var gMQPublishInterval = 1*time.Second
var gMQSubscribeTopic = "sale-order.created"
var gMQPublishTopic = "sale-order.stock.outbound"
var gMQClientID = "stock-service"
var gMQServerURL = "http://localhost:3333"
var gMQMaxQueuedMsg = 1024

var MockStockService = newStockService()

NotifySaleOrderCreated.go

用于监听订单创建消息的http回调处理器

package stock

import (
    "encoding/json"
    "github.com/gin-gonic/gin"
    "io/ioutil"
    "learning/gooop/saga/mqs/logger"
    "learning/gooop/saga/mqs/models"
    "learning/gooop/saga/order"
    "net/http"
)

func NotifySaleOrderCreated(c *gin.Context) {
    body := c.Request.Body
    defer body.Close()

    j, e := ioutil.ReadAll(body)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed ioutil.ReadAll")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    msg := &models.TxMsg{}
    e = json.Unmarshal(j, msg)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal msg")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }

    order := &order.SaleOrder{}
    e = json.Unmarshal([]byte(msg.Content), order)
    if e != nil {
        logger.Logf("stock.NotifySaleOrderCreated, failed json.Unmarshal order")
        c.JSON(http.StatusBadRequest, gin.H { "ok": false, "error": e.Error()})
        return
    }
    logger.Logf("stock.NotifySaleOrderCreated, order=%s/%v", order.OrderID, order)

    // notify stock service
    _ = MockStockService.HandleSaleOrderCreated(order)
    c.JSON(http.StatusOK, gin.H{ "ok": true })
}

var PathOfNotifySaleOrderCreated = "/notify/sale-order.created"

(未完待续)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK