

broker and event in go-micro
source link: https://ioridy.github.io/2022/01/22/go-micro-broker-and-event/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

在调研broker的具体实现时,发现之前项目中的消息发送,是使用的Client(publish)/Server(subscribe)的方式,并没有直接使用broker,于是决定调研下这两者是什么关系。
Broker
broker是go-micro自身定义的异步Pub/Sub interface, 不同的机制(kafka、mqtt、nats…)最终只需要实现对应的接口,即可支持go-micro的异步消息发布/订阅。
type Broker interface {
Init(...Option) error
Options() Options
Address() string
Connect() error
Disconnect() error
Publish(topic string, m *Message, opts ...PublishOption) error
Subscribe(topic string, h Handler, opts ...SubscribeOption) (Subscriber, error)
String() string
}
Event
event是go-micro基于broker的interface封装的一个基于protobuf的消息发送/订阅模块, 即最终还是依赖broker的实现(go-micro默认提供一个点对点http代理),所以只需要使用plugin的方式,修改了broker的实现, event即可应用。
- Event只定义了Publish接口 (
micro.go
)// Event is used to publish messages to a topic
type Event interface {
// Publish publishes a message to the event topic
Publish(ctx context.Context, msg interface{}, opts ...client.PublishOption) error
}
// Type alias to satisfy the deprecation
type Publisher = Event - Client中Publish (
client/client.go
)// Client is the interface used to make requests to services.
// It supports Request/Response via Transport and Publishing via the Broker.
// It also supports bidirectional streaming of requests.
type Client interface {
Init(...Option) error
Options() Options
NewMessage(topic string, msg interface{}, opts ...MessageOption) Message
NewRequest(service, endpoint string, req interface{}, reqOpts ...RequestOption) Request
Call(ctx context.Context, req Request, rsp interface{}, opts ...CallOption) error
Stream(ctx context.Context, req Request, opts ...CallOption) (Stream, error)
Publish(ctx context.Context, msg Message, opts ...PublishOption) error
String() string
} - Server中Subscribe (
server/server.go
)// Server is a simple micro server abstraction
type Server interface {
Options() Options
Init(...Option) error
Handle(Handler) error
NewHandler(interface{}, ...HandlerOption) Handler
NewSubscriber(string, interface{}, ...SubscriberOption) Subscriber
Subscribe(Subscriber) error
Start() error
Stop() error
String() string
} - micro中RegisterSubscriber (
micro.go
)// RegisterSubscriber is syntactic sugar for registering a subscriber
func RegisterSubscriber(topic string, s server.Server, h interface{}, opts ...server.SubscriberOption) error {
return s.Subscribe(s.NewSubscriber(topic, h, opts...))
} - grpc实现的Server中Subscribe (
server/grpc/grpc.go
)func newGRPCServer(opts ...server.Option) server.Server {
options := newOptions(opts...)
// create a grpc server
srv := &grpcServer{
opts: options,
rpc: &rServer{
serviceMap: make(map[string]*service),
},
handlers: make(map[string]server.Handler),
subscribers: make(map[*subscriber][]broker.Subscriber),
exit: make(chan chan error),
wg: wait(options.Context),
}
// configure the grpc server
srv.configure()
return srv
}
基于以上的分析, 直接使用Broker实现的Publish/Subscribe
和使用go-micro中封装的Event实现的Publish/Subscribe
本质是相同的,但是在使用的时候还是有一点差异:
- Event的Body可以使用proto定义的message,Broker的body只能是[]byte
- Event的Header需要通过context传递到底层, Broker直接设置header
- 同一个topic使用两种方式都可以接受到,但是通过Broker直接发布的消息, Event订阅接受后,会提示序列化错误
Recommend
-
28
ArchiveLagTarget ORA-16714
-
54
mqtt协议-broker之moqutte源码研究六之集群,moquette集群讲解
-
40
-
26
-
35
Jackdaw Simple configuration and mocking of Kafka clients. Why Jackdaw? While Kafka is an awesome message stre...
-
18
️ 阅读本文需 7 分钟 上周日,TGIP-CN 003 由郭斯杰继续为我们带来了关于 「Topic Discovery, ServiceURL and Cluster」 的分享。 首先来回顾一下 Pulsar 最近的更新特性。 ...
-
12
August 26, 2018 RabbitMQ Message Broker Patterns RabbitMQ uses variety of asynchronous architectural patterns to decouple applications. Here we’l...
-
11
之前有简单了解过go-micro的broken以及默认的http实现(参考:go-micro中的发布订阅Broker分析)),目前因为消...
-
5
JavaScript Event Loop、Micro Task、Macro TaskEvent LoopEvent Loop:事件循环、Micro Task:微任务、Macro Task:宏任务我们知道 JavaScript 是单线程语言,一心不能二用,也就是说它只能把一件事干完才能去干另一件事,但前端的一...
-
1
Protecting a broker from a failing event handler Raymond Che...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK