

Concurrent observer pattern in Go
source link: https://rebelsource.dev/blog/concurrent-observer-pattern-in-go/MqTnjdkcE
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.


Image provided by Unsplash from Getty Images . Download the image and support the author!
Introduction
I will not try to explain what the observer pattern is and how it is implemented. A great explanation can be found refactoring.guru. Instead, I will implement the sync version and then the concurrent version without changing the interface of both Subject and Observer.
Also, mind you that, for our particular use case that we will be examining, a simple single worker goroutine might be a better option. We will get to that later.
Synchronous implementation
The most simple implementation, as described in the article above, is first creating our types. Then, our concrete implementation. This is not complicated, so I will just leave it for you to study it in the code block below.
package main
import "fmt"
type observer interface {
ID() string
Observe(value interface{})
}
type subject interface {
Register(observer observer)
Unregister(observer observer)
Notify()
}
type product struct {
id string
name string
}
func newProduct(id string, name string) product {
return product{id: id, name: name}
}
/**
Our concrete subject in the form of a product
*/
type productSubject struct {
observers []observer
product
}
func newProductSubject(p product) subject {
return &productSubject{
observers: make([]observer, 0),
product: p,
}
}
func (p *productSubject) Register(concreteObserver observer) {
for _, v := range p.observers {
if v.ID() == concreteObserver.ID() {
return
}
}
p.observers = append(p.observers, concreteObserver)
}
func (p *productSubject) Unregister(concreteObserver observer) {
newObservers := make([]observer, 0)
for _, v := range p.observers {
if v.ID() != concreteObserver.ID() {
newObservers = append(newObservers, concreteObserver)
}
}
p.observers = newObservers
}
func (p *productSubject) Notify() {
for _, o := range p.observers {
o.Observe(p.product)
}
}
/**
Our product observer in the form of a customer.
Customer will be notified every time a product is
changed.
*/
func newCustomer(id string) observer {
return customer{id: id}
}
type customer struct {
id string
}
func (s customer) Observe(value interface{}) {
p := value.(product)
fmt.Printf("Observing product %s with ID %s\n", p.name, p.id)
}
func (s customer) ID() string {
return s.id
}
func main() {
notifier := newProductSubject(newProduct("1", "Red shoe"))
for i := 0; i < 10; i++ {
notifier.Register(newCustomer(fmt.Sprintf("customer%[email protected]", i)))
}
notifier.Notify()
}
We create our product Subject with a single product. In a real world scenario, that product could change availability or it could be broken so every customer that has anything to do with this product must be notified. The problem with this implementation is that every time we want to notify our customers, we would have to wait for all the customer instances to finish what they are doing. This might be sending an email, working with the database or calling some external API. These operations might be time intensive and we might not want to wait until they are finished. What we want is to change the implementation to not blog our main thread.
Let's do this with a few simple tweaks.
First, we will modify our Notify() method. This method will spawn a goroutine that will send the product to all observers.
func (p *asyncProductSubject) Notify() {
go func() {
for _, o := range p.observers {
o.Observe(p.product)
}
}()
}
In our customer, we will add a listener channel that will listen to product changes without changing our observer interface.
package main
import "fmt"
func newAsyncCustomer(id string) observer {
a := asyncCustomer{id: id}
a.spawn()
return a
}
type asyncCustomer struct {
id string
listener chan product
}
func (s asyncCustomer) Observe(value interface{}) {
p := value.(product)
s.listener <- p
}
func (s asyncCustomer) ID() string {
return s.id
}
func (s asyncCustomer) spawn() {
go func() {
for l := range s.listener {
fmt.Printf("Observing product %s with ID %s\n", l.name, l.id)
}
}()
}
All we are doing here is creating a spawn() method that listens to changes that the subject has made. Nothing less, nothing more. You can see the full implementation below:
package main
import (
"fmt"
"time"
)
type observer interface {
ID() string
Observe(value interface{})
}
type subject interface {
Register(observer observer)
Unregister(observer observer)
Notify()
}
type product struct {
id string
name string
}
func newProduct(id string, name string) product {
return product{id: id, name: name}
}
type productSubject struct {
observers []observer
product
}
func newProductSubject(p product) subject {
return &productSubject{
observers: make([]observer, 0),
product: p,
}
}
func (p *productSubject) Register(concreteObserver observer) {
for _, v := range p.observers {
if v.ID() == concreteObserver.ID() {
return
}
}
p.observers = append(p.observers, concreteObserver)
}
func (p *productSubject) Unregister(concreteObserver observer) {
newObservers := make([]observer, 0)
for _, v := range p.observers {
if v.ID() != concreteObserver.ID() {
newObservers = append(newObservers, concreteObserver)
}
}
p.observers = newObservers
}
func (p *productSubject) Notify() {
go func() {
for _, o := range p.observers {
o.Observe(p.product)
}
}()
}
func newCustomer(id string) observer {
a := customer{id: id, listener: make(chan product)}
a.spawn()
return a
}
type customer struct {
id string
listener chan product
}
func (s customer) Observe(value interface{}) {
p := value.(product)
s.listener <- p
}
func (s customer) ID() string {
return s.id
}
func (s customer) spawn() {
go func() {
for l := range s.listener {
fmt.Printf("Observing product %s with ID %s\n", l.name, l.id)
}
}()
}
func main() {
notifier := newProductSubject(newProduct("1", "Red shoe"))
for i := 0; i < 5; i++ {
notifier.Register(newCustomer(fmt.Sprintf("customer%[email protected]", i)))
}
notifier.Notify()
time.Sleep(1 * time.Second)
}
Conclusion
This is a pretty straightforward and a simple implementation. The problem is, what if we have thousands of products that need to be notified to thousands of customers? In my opinion, this pattern does not solve that problem. A worker pool might be a good idea for this type of issue. An even better solution is to use something like RabbitMQ or Kafka for many-to-many publish/subscribe issues.
Hello, visitor.
This blogging platform is created specifically for software developers. We aim to support many more programming languages and development environments but for that, we need your support. If you like this blogging platform, consider using it to write your blogs.
We tried to make your experience of creating blog as painless as possible soSign in and give it a try.
Sign in to post comments
</div
Recommend
-
87
Learn new Google tools with your community. Find a DevFest near you!
-
47
The Observer pattern, also known as the Publish-Subscriber pattern, is one of the behavioral design patterns that defines a one-to-many relationship between objects. For example, when the state of one object, Subje...
-
23
Contents What is Observer Pattern? Observer Pattern Example Push And Pull Mechanism Obser...
-
8
Improving Observer Pattern APIs in Swift With Weak Collections Improving Observer Pattern APIs in Swift With Weak Collections August 25th, 2020 Even if you don't know what the Observer pat...
-
10
The Observer Pattern in JavaScript — the Key to a Reactive BehaviorA closer look at one of my favorite design patterns and why it’s so relevant in today’s reactive worldImage by
-
11
Implementing the Observer Pattern with an Event SystemFebruary 16, 2016 · 3 min · Benjamin BengfortI was looking back through some old code (hoping to find a quick post before I got back to work) when I ran across a projec...
-
7
这一节开始学习观察者模式,开始讲之前会先像第一节那样通过一个应用场景来引入该模式。具体场景为:气象站提供了一个WeatherData对象,该对象可以追踪获取天气的温度、气压、湿度信息,WeatherData对象会随即更新三个布告板的显示:目前状况(温度、湿度...
-
10
Use Database Transaction Logs to Implement Observer Pattern by@amritsinghUse Database Transaction Logs to Implement Observer Pattern
-
8
The observer pattern and binary sizes Sandor Dargo 10 hours ago2023-03-15T00:00:00+01:00 11 minIn the previous article on binary sizes, we discussed how the decorator pattern’s classic and modern...
-
10
一、模式动机 观察者模式用于描述对象之间的依赖关系,它引入了观察者和观察目标两类不同的角色,由于提供了抽象层,它使得增加新的观察者和观察目标都很方便。观察者模式广泛应用于各种编程语言的事件处理模型中,Java语言也...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK