微服务之间通过RabbitMQ通信
source link: https://studygolang.com/articles/13637?amp%3Butm_medium=referral
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
微服务之间通过RabbitMQ通信
微服务之间是相互独立的,不像单个工程一样各个模块之间可以直接通过方法调用实现通信,相互独立的服务直接一般的通信方式是使用 HTTP协议
、 rpc协议
或者使用消息中间件如 RabbitMQ``Kafka
等
image
在这篇文章 使用Golang和MongoDB构建微服务 已经实现了一个微服务的应用,在文章中已经实现了各个服务直接的通信,是使用的 HTTP
的形式 ,那各个服务之间如何通过 RabbitMQ
进行消息通信呢,我们现在要实现一个功能,就是一个用户预订电影票的接口,需要服务 User Service(port 8000) 和 服务 Booking Service(port 8003) 之间通信,用户预订之后,把预订信息写入到 booking 的数据库中
安装 RabbitMQ
安装 RabbitMQ
之前需要先安装 Erlang 的环境 ,然后下载安装 RabbitMQ ,请选择对应的版本,安装完成之后,RabbitMQ在Windows上是作为一个服务在后台运行,关于 RabbitMQ
的接口如何使用,请参考官网的 教程 ,有各个主流语言的实现我们使用的是 Go
版本,请下载对应的实现接口 go get github.com/streadway/amqp
对 RabbitMQ
的接口做一下简单的封装
- 定义一个接口
messaging/message.go
type IMessageClient interface { ConnectToBroker(connectionStr string) error PublishToQueue(data []byte, queueName string) error SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error Close() } type MessageClient struct { conn *amqp.Connection }
- 连接接口
func (m *MessageClient) ConnectToBroker(connectionStr string) error { if connectionStr == "" { panic("the connection str mustnt be null") } var err error m.conn, err = amqp.Dial(connectionStr) return err }
- 发布消息接口
func (m *MessageClient) PublishToQueue(body []byte, queueName string) error { if m.conn == nil { panic("before publish you must connect the RabbitMQ first") } ch, err := m.conn.Channel() defer ch.Close() failOnError(err, "Failed to open a channel") q, err := ch.QueueDeclare( queueName, false, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") err = ch.Publish( "", q.Name, false, false, amqp.Publishing{ ContentType: "application/json", Body: body, }, ) failOnError(err, "Failed to publish a message") return nil }
- 订阅消息接口
func (m *MessageClient) SubscribeToQueue(queueName string, handlerFunc func(amqp.Delivery)) error { ch, err := m.conn.Channel() //defer ch.Close() failOnError(err, "Failed to open a channel") q, err := ch.QueueDeclare( queueName, false, false, false, false, nil, ) failOnError(err, "Failed to declare a queue") msgs, err := ch.Consume( q.Name, "", true, false, false, false, nil, ) failOnError(err, "Failed to register a consumer") go consumeLoop(msgs, handlerFunc) return nil }
实现通信
在 User Service 中定义一个新的 POST
接口 /user/{name}/booking
,实现用户的预订功能,预订之后,通过 RabbitMQ
发布一个消息给
Booking Service, Booking Service 接收到消息之后,做相应的处理(写入数据库)
User Service
- 初始化
MessageClient
users/controllers/user.go
var client messaging.IMessageClient func init() { client = &messaging.MessageClient{} err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/") if err != nil { fmt.Println("connect to rabbitmq error", err) } }
- 添加新的路由和实现
routes.go
register("POST", "/user/{name}/booking", controllers.NewBooking, nil)
users/controllers/user.go
func NewBooking(w http.ResponseWriter, r *http.Request) { params := mux.Vars(r) user_name := params["name"] defer r.Body.Close() var bookings models.Booking body, _ := ioutil.ReadAll(r.Body) err := json.Unmarshal(body, &bookings) if err != nil { fmt.Println("the format body error ", err) } fmt.Println("user name:", user_name, bookings) go notifyMsg(body) }
- 用一个协程实现消息的发布
func notifyMsg(body []byte) { err := client.PublishToQueue(body, "new_booking") if err != nil { fmt.Println("Failed to publis message", err) } }
Booking Service
- 初始化MessageClient
var client messaging.IMessageClient func initMessage() { client = &messaging.MessageClient{} err := client.ConnectToBroker("amqp://guest:guest@localhost:5672/") if err != nil { fmt.Println("Failed to connect to RabbitMQ", err) } err = client.SubscribeToQueue("new_booking", getBooking) if err != nil { fmt.Println("Failed to comsuer the msg", err) } }
在 web服务之前启动
func main() { initMessage() r := routes.NewRouter() http.ListenAndServe(":8003", r) }
- 接收后的消息处理
func getBooking(delivery amqp.Delivery) { var booking models.Booking json.Unmarshal(delivery.Body, &booking) booking.Id = bson.NewObjectId().Hex() dao.Insert("Booking", "BookModel", booking) fmt.Println("the booking msg", booking) }
验证,需要启动 User Service 和 Booking Service
使用 Postman
发送对应的数据
post 127.0.0.1:8000/user/kevin_woo/booking { "name":"kevin_woo", "books":[ { "date":"20180727", "movies":["5b4c45d49d5e3e33c4a5b97a"] }, { "date":"20180810", "movies":["5b4c45ea9d5e3e33c4a5b97b"] } ] }
可以看到数据库已经有了一条新的预订信息
说明,我这里POST的数据就是booking数据库中的结构,实际情况需要对数据进行封装处理,在POST数据时,没有对数据进行验证,
在实际开发过程中需要对各个数据做相应的验证,这里主要是看一下 RabbitMQ的消息传递处理的过程
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK