7

Concurrent observer pattern in Go

 1 year ago
source link: https://rebelsource.dev/blog/concurrent-observer-pattern-in-go/MqTnjdkcE
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Concurrent observer pattern in Go

Image provided by Unsplash from Getty Images . Download the image and support the author!

Introduction

I will not try to explain what the observer pattern is and how it is implemented. A great explanation can be found refactoring.guru. Instead, I will implement the sync version and then the concurrent version without changing the interface of both Subject and Observer

Also, mind you that, for our particular use case that we will be examining, a simple single worker goroutine might be a better option. We will get to that later. 

Synchronous implementation

The most simple implementation, as described in the article above, is first creating our types. Then, our concrete implementation. This is not complicated, so I will just leave it for you to study it in the code block below. 

Go v1.18
package main

import "fmt"

type observer interface {
	ID() string
	Observe(value interface{})
}

type subject interface {
	Register(observer observer)
	Unregister(observer observer)
	Notify()
}

type product struct {
	id   string
	name string
}

func newProduct(id string, name string) product {
	return product{id: id, name: name}
}

/**
Our concrete subject in the form of a product
*/

type productSubject struct {
	observers []observer
	product
}

func newProductSubject(p product) subject {
	return &productSubject{
		observers: make([]observer, 0),
		product:   p,
	}
}

func (p *productSubject) Register(concreteObserver observer) {
	for _, v := range p.observers {
		if v.ID() == concreteObserver.ID() {
			return
		}
	}

	p.observers = append(p.observers, concreteObserver)
}

func (p *productSubject) Unregister(concreteObserver observer) {
	newObservers := make([]observer, 0)
	for _, v := range p.observers {
		if v.ID() != concreteObserver.ID() {
			newObservers = append(newObservers, concreteObserver)
		}
	}

	p.observers = newObservers
}

func (p *productSubject) Notify() {
	for _, o := range p.observers {
		o.Observe(p.product)
	}
}

/**
Our product observer in the form of a customer. 
Customer will be notified every time a product is
changed. 
*/

func newCustomer(id string) observer {
	return customer{id: id}
}

type customer struct {
	id string
}

func (s customer) Observe(value interface{}) {
	p := value.(product)

	fmt.Printf("Observing product %s with ID %s\n", p.name, p.id)
}

func (s customer) ID() string {
	return s.id
}

func main() {
    notifier := newProductSubject(newProduct("1", "Red shoe"))
	for i := 0; i < 10; i++ {
		notifier.Register(newCustomer(fmt.Sprintf("customer%[email protected]", i)))
	}

	notifier.Notify()
}


We create our product Subject with a single product. In a real world scenario, that product could change availability or it could be broken so every customer that has anything to do with this product must be notified. The problem with this implementation is that every time we want to notify our customers, we would have to wait for all the customer instances to finish what they are doing. This might be sending an email, working with the database or calling some external API. These operations might be time intensive and we might not want to wait until they are finished. What we want is to change the implementation to not blog our main thread. 

Let's do this with a few simple tweaks. 

First, we will modify our Notify() method. This method will spawn a goroutine that will send the product to all observers. 

Go v1.18 Readonly
func (p *asyncProductSubject) Notify() {
	go func() {
		for _, o := range p.observers {
			o.Observe(p.product)
		}
	}()
}

In our customer, we will add a listener channel that will listen to product changes without changing our observer interface. 

Go v1.18 Readonly
package main

import "fmt"

func newAsyncCustomer(id string) observer {
	a := asyncCustomer{id: id}
	a.spawn()

	return a
}

type asyncCustomer struct {
	id       string
	listener chan product
}

func (s asyncCustomer) Observe(value interface{}) {
	p := value.(product)

	s.listener <- p
}

func (s asyncCustomer) ID() string {
	return s.id
}

func (s asyncCustomer) spawn() {
	go func() {
		for l := range s.listener {
			fmt.Printf("Observing product %s with ID %s\n", l.name, l.id)
		}
	}()
}

All we are doing here is creating a spawn() method that listens to changes that the subject has made. Nothing less, nothing more. You can see the full implementation below:

Go v1.18
package main
import (
    "fmt"
    "time"
)

type observer interface {
	ID() string
	Observe(value interface{})
}

type subject interface {
	Register(observer observer)
	Unregister(observer observer)
	Notify()
}

type product struct {
	id   string
	name string
}

func newProduct(id string, name string) product {
	return product{id: id, name: name}
}

type productSubject struct {
	observers []observer
	product
}

func newProductSubject(p product) subject {
	return &productSubject{
		observers: make([]observer, 0),
		product:   p,
	}
}

func (p *productSubject) Register(concreteObserver observer) {
	for _, v := range p.observers {
		if v.ID() == concreteObserver.ID() {
			return
		}
	}

	p.observers = append(p.observers, concreteObserver)
}

func (p *productSubject) Unregister(concreteObserver observer) {
	newObservers := make([]observer, 0)
	for _, v := range p.observers {
		if v.ID() != concreteObserver.ID() {
			newObservers = append(newObservers, concreteObserver)
		}
	}

	p.observers = newObservers
}

func (p *productSubject) Notify() {
	go func() {
		for _, o := range p.observers {
			o.Observe(p.product)
		}
	}()
}

func newCustomer(id string) observer {
	a := customer{id: id, listener: make(chan product)}
	a.spawn()

	return a
}

type customer struct {
	id string
    listener chan product
}

func (s customer) Observe(value interface{}) {
	p := value.(product)

	s.listener <- p
}

func (s customer) ID() string {
	return s.id
}

func (s customer) spawn() {
	go func() {
		for l := range s.listener {
			fmt.Printf("Observing product %s with ID %s\n", l.name, l.id)
		}
	}()
}

func main() {
    notifier := newProductSubject(newProduct("1", "Red shoe"))
	for i := 0; i < 5; i++ {
		notifier.Register(newCustomer(fmt.Sprintf("customer%[email protected]", i)))
	}

	notifier.Notify()

    time.Sleep(1 * time.Second)
}


Conclusion

This is a pretty straightforward and a simple implementation. The problem is, what if we have thousands of products that need to be notified to thousands of customers? In my opinion, this pattern does not solve that problem. A worker pool might be a good idea for this type of issue. An even better solution is to use something like RabbitMQ or Kafka for many-to-many publish/subscribe issues. 

Tweet
Share
Copy link

Hello, visitor.

This blogging platform is created specifically for software developers. We aim to support many more programming languages and development environments but for that, we need your support. If you like this blogging platform, consider using it to write your blogs.

We tried to make your experience of creating blog as painless as possible soSign in and give it a try.

Sign in to post comments


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK