Non-Blocking Parallelism for Services in Go
source link: http://www.goldsborough.me/go/2020/12/06/12-24-24-non-blocking_parallelism_for_services_in_go/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Non-Blocking Parallelism for Services in Go
Dec 6, 2020 10 minute readGo has plenty of useful builtin functionality for safe, concurrent and parallel code. However neat those features may be, they cannot write your program for you. As is the case for many languagges, the most important morsels of knowledge are not in the features of the language, but in the well-known patterns that compose those features into solutions that can address frequently reoccurring problems. I’m relatively new to using Go as my daily bread-and-butter language and recently encountered a useful pattern that I thought worth sharing. I’m told that at Palantir it is called the tickler pattern.
The Tickler Pattern
The tickler pattern addresses a very particular scenario:
- You have a service processing requests of some kind and looping forever,
- Calling the service should not block the caller,
- You optionally want to bound the amount of parallelism in the service, i.e. how many requests are processed at a time.
The crux of the pattern will look as follows:
- A
list.List
will represent the service’s internal queue, - We’ll have the service loop forever, waiting for a signal to reconsider the contents of the queue,
- When a new request is enqueued, this signal is sent,
- When the service completes the work for a request, this signal is also sent,
- A semaphore puts a limit on the number of in-flight requests.
Let’s look at the code and discuss these steps in more detail. We begin with our types:
// Dummy request -- usually some Protobuf/Thrift struct
type Request int
type Service struct {
mu sync.Mutex
queue *list.List
sema chan int
loopSignal chan struct{}
}
We’ll use a channel to represent a bounded semaphore. This is perfectly idiomatic in Go, although you could also use sync.Semaphore
for this if you really wanted to. One of our explicit requirements for the kind of solution we wanted was that the caller shall never block when making a request. This requirement is met by the fact that we will always first enqueue a request, leaving its processing to be done asynchronously. Note that my solution will not provide a way to retrieve a response for the request.
func (s *Service) EnqueueRequest(request Request) error {
s.mu.Lock()
defer s.mu.Unlock()
s.queue.PushBack(request)
log.Printf("Added request to queue with length %d\n", s.queue.Len())
s.tickleLoop()
return nil
}
We acquire a mutex for our queue since we will have multiple goroutines accessing it concurrently. We enqueue the request and then notify the loop. The last part is explained in a bit. Now that we have a way of adding requests, we can work on the code to handle the requests. Since this is a service, we’ll have it loop forever waiting for new work to be done. This is also where the loop signal comes into play:
func (s *Service) loop(ctx context.Context) {
log.Println("Starting service loop")
for {
select {
case <-s.loopSignal:
s.tryDequeue()
case <-ctx.Done():
log.Printf("Loop context cancelled")
return
}
}
}
Our loop
method will have the service loop until the Done()
channel on a context that we pass in is fulfilled. This will allow us to shut down the service gracefully at the end of our program, or in case an error occurs. More importantly, we wait for the loopSignal
to tell us to reconsider the queue. This signal is sent on one of two occasions:
- When a new request is enqueued,
- When we finish processing a request.
You can convince yourself that these are the only two events in the lifetime of our service at which it is necessary to consider the queue. If the service’s queue is empty, enqueuing a new request tells the service to look at this new request. If a request is enqueued but it cannot be handled right away because we’ve reached the limit on the parallelism we configured, the only time this will change is when a request completes processing, i.e. when the semaphore is replenished by one token.
Let’s now look at tryDequeue
, which is called whenever the loop is “tickled”. Its unconfident naming stems from the fact that when tryDequeue
is called, we:
- Don’t know whether the queue has any requests at all, since the signal could have originated from having finished processing the last in-flight request,
- Don’t know whether we have sufficient resources if there is a request in the queue.
func (s *Service) tryDequeue() {
s.mu.Lock()
defer s.mu.Unlock()
if s.queue.Len() == 0 {
return
}
select {
case s.sema <- 1:
request := s.dequeue()
log.Printf("Dequeued request %v\n", request)
go s.process(request)
default:
log.Printf("Received loop signal, but request limit is reached")
}
}
func (s *Service) dequeue() Request {
element := s.queue.Front()
s.queue.Remove(element)
return element.Value.(Request)
}
We check for the first case of uncertainty by testing the length of the queue. If we know we have work to do, we need to check if the semaphore has sufficient slots to allow for the request to be handled. Because our semaphore sema
will be a buffered channel, we know that if the semaphore has no slots left, sending it a new token (the integer 1
) will block, or in this case trigger the default case. If we’ve reached our limit on parallelism, we simply punt on handling this request until the loop is tickled anew. The dequeue
function is a little helper that takes care of the nasty business of popping an element from the queue, which is made to involve type casting because of Go’s lack of generics.
If a request passes the demanding tests of the tryDequeue
function and is elected for processing, we take care of it in the process
method:
func (s *Service) process(request Request) {
defer s.replenish()
log.Printf("Processing request %v\n", request)
// Simulate work
<-time.After(time.Duration(rand.Intn(500)) * time.Millisecond)
}
func (s *Service) replenish() {
<-s.sema
log.Printf("Replenishing semaphore, now %d/%d slots in use\n", len(s.sema), cap(s.sema))
s.tickleLoop()
}
What’s really more interesting is the replenish
function. This function has two responsibilities. First, it frees a slot in the semaphore so that a new request may be handled. Second, it tickles the loop to inform it that it’s a good time to consider the request queue again, in case there are requests that were blocked on other requests completing first. If you recall, this is the second of two cases in which the request queue must be reconsidered.
Let’s also lift the veil of mystery on the tickleLoop
function. It simply makes a non-blocking send to the loopSignal
channel:
func (s *Service) tickleLoop() {
select {
case s.loopSignal <- struct{}{}:
default:
}
}
Finally, we tie everything together in the constructor of our service:
func NewService(ctx context.Context, requestLimit int) *Service {
service := &Service{
queue: list.New(),
sema: make(chan int, requestLimit),
loopSignal: make(chan struct{}, 1),
}
go service.loop(ctx)
return service
}
We construct loopSignal
as a buffered channel with one free slot. If the service’s loop is currently waiting for a value from the channel, a single send will tell it to reconsider the queue. If it is currently considering the queue but a request completes processing, we need to “enqueue” a signal so that when it exits tryDequeue
and re-enters the context of the select
statement, it knows to immediately reconsider the queue. The sema
semaphore is a channel with sufficient buffering for the number of concurrent processing slots we wish to allow for the service. My NewService
constructor starts the loop right away, but you could also make Loop
a public method and do this outside of the constructor.
A brief demonstration of our service in action is due:
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
service := NewService(ctx, 3)
for i := 0; i < 10; i++ {
if err := service.EnqueueRequest(Request(i)); err != nil {
log.Fatalf("error sending request: %v", err)
break
}
<-time.After(time.Duration(rand.Intn(100)) * time.Millisecond)
}
for {
time.Sleep(time.Second)
}
}
We create a cancellable context for the service that we use to signal the service to terminate its loop. For this demonstration, I bound the parallelism to three requests. The infinite loop at the end simulates running the service forever. Running this code produces the following output (out of many possible variations):
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Starting service loop
2020/12/06 12:09:43 Dequeued request 0
2020/12/06 12:09:43 Processing request 0
2020/12/06 12:09:43 Replenishing semaphore, now 0/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 1
2020/12/06 12:09:43 Processing request 1
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 2
2020/12/06 12:09:43 Processing request 2
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 3
2020/12/06 12:09:43 Processing request 3
2020/12/06 12:09:43 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 4
2020/12/06 12:09:43 Processing request 4
2020/12/06 12:09:43 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:09:43 Replenishing semaphore, now 1/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 5
2020/12/06 12:09:43 Processing request 5
2020/12/06 12:09:43 Replenishing semaphore, now 1/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 6
2020/12/06 12:09:43 Processing request 6
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 7
2020/12/06 12:09:43 Processing request 7
2020/12/06 12:09:43 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 8
2020/12/06 12:09:43 Processing request 8
2020/12/06 12:09:43 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:09:43 Added request to queue with length 1
2020/12/06 12:09:43 Dequeued request 9
2020/12/06 12:09:43 Processing request 9
2020/12/06 12:09:44 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:09:44 Replenishing semaphore, now 1/3 slots in use
2020/12/06 12:09:44 Replenishing semaphore, now 0/3 slots in use
You can observe the dance between requests being enqueued and the semaphore being replenished as requests complete. If we increase the rate at which requests are being sent by reducing the delay between calls to service.EnqueueRequest
, we see that the queue grows quite a bit before the semaphore is replenished by a completing request:
2020/12/06 12:17:58 Added request to queue with length 1
2020/12/06 12:17:58 Starting service loop
2020/12/06 12:17:58 Dequeued request 0
2020/12/06 12:17:58 Processing request 0
2020/12/06 12:17:58 Added request to queue with length 1
2020/12/06 12:17:58 Dequeued request 1
2020/12/06 12:17:58 Processing request 1
2020/12/06 12:17:58 Added request to queue with length 1
2020/12/06 12:17:58 Dequeued request 2
2020/12/06 12:17:58 Processing request 2
2020/12/06 12:17:58 Added request to queue with length 1
2020/12/06 12:17:58 Received loop signal, but request limit is reached
2020/12/06 12:17:58 Added request to queue with length 2
2020/12/06 12:17:58 Added request to queue with length 3
2020/12/06 12:17:58 Received loop signal, but request limit is reached
2020/12/06 12:17:58 Received loop signal, but request limit is reached
2020/12/06 12:17:58 Added request to queue with length 4
2020/12/06 12:17:58 Received loop signal, but request limit is reached
2020/12/06 12:17:58 Added request to queue with length 5
2020/12/06 12:17:58 Received loop signal, but request limit is reached
2020/12/06 12:17:58 Replenishing semaphore, now 2/3 slots in use
2020/12/06 12:17:58 Dequeued request 3
2020/12/06 12:17:58 Processing request 3
Conclusion
And there we are – the tickler pattern! Not the only pattern for bounded parallelism in Go, but one that addresses the constraints we set out in the beginning very well. The only scalability flaw I see with it right away is that the service could run out of memory if the rate of new requests is much greater than the rate at which requests are processed, which could grow the queue to significant lengths. This could be addressed by rejecting requests out right if the queue has reached a certain length, or rate limiting clients whose volume and frequency of requests is too demannding. If you can think of other flaws, let me know!
Recommend
-
101
ReactPHP: Event-driven, non-blocking I/O with PHP Event-dri...
-
41
README.md Pi-Hole and PiVPN on Google Compute Engine Free Tier with Full Tunnel and Split Tunnel OpenVPN Configurations
-
2
Failure Detectors, and Non-Blocking Atomic Commit Non-blocking atomic commit is harder than uniform consensus. Why would that be? Many of the most interesting results in distributed systems have come from looking at problem...
-
2
MoarTLS: Non-Secure Download Blocking With little fanfare, an important security change has arrived on the web. Now, all major browsers (except Safari) block non-secure downloads from a secure page. Browser Ve...
-
5
a reactive (or non-blocking, or asynchronous) JSON parser « Michel KrämerActson: a reactive (or non-blocking, or asynchronous) JSON parserToday I’m happy to announce the very first version of Actson, a reactive JSON parser (some...
-
2
V2EX › 问与答 关于 Non-Blocking 的终极一问 ojh · 1...
-
6
Sponsor: Do you build complex software systems? See how NServiceBus makes it easier to design, build, and manage software systems that use message queues to achieve loose coupling.
-
6
Sandeep Malhotra Sep 21 at 12:39 PM Async /Non Blocking Mode - SAP CAP 347 Views Last edit Sep 21 a...
-
2
...
-
4
Non Blocking Server in Java NIO
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK