Akka vs. ZIO vs. Monix, part 2: communication
source link: https://www.tuicool.com/articles/hit/ZF7vEnI
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
In part 1, we’ve explored how to implement a process which manages some non-trivial state using Akka , Akka Typed, Monix and ZIO . However, as a popular saying by Carl Hewitt goes, “one actor is no actor, they come in systems” . Hence, let’s explore examples which use multiple communicating actors, and see if it’s still possible and practical to implement them using ZIO or Monix.
Crawler
Our first example will be an implementation of the popular master-worker pattern , where we have a single master process distributing work to a number of worker processes. When a worker finishes a work unit, it sends the results to the master process, which gathers them and includes in the overall computation results.
More concretely, the task will be to create a web crawler. Starting from a given URL, it should traverse all the links, counting which hosts are most popular. HTTP requests should be executed in parallel, however there’s one additional restriction: we don’t want to be suspicious in our crawling efforts, so we impose a restriction that at any given time, at most one request to any host should be executed (but requests to different hosts can be done in parallel).
As we are interested in the way a process is defined, not in the actual crawling, we’ll use a stub HTTP service, along with stub functions which extract interesting links (which we want to crawl) from the site’s content:
Traditional Akka
Like in part 1, let’s start with a “traditional” Akka solution, and then move to other implementations. Again, only crucial snippets will be included, but the full source code is available on GitHub .
We’ll have to define two actors: Crawler
and Worker
. To construct a Crawler
actor, we need an interface for executing HTTP requests ( http: Http[Future]
, as we are in Akka-land and everything is Future
-based), a way to parse links ( parseLinks
), and a Promise
waiting to be completed with the final result (once all pages have been crawled; we’re assuming that it’s a finite process, and parseLinks
gives only “interesting” links, for example from a set of “interesting” hosts):
The internal state of the actor consists of:
-
referenceCount
— the current host popularity -
visitedLinks
— which URLs have already been processed or are processed, to avoid processing them once again -
inProgress
— set of URLs which are currently processed. Once this becomes empty (after starting the process), the crawling is done and theresult
promise can be completed withreferenceCount
-
workers
— each host will have a dedicated worker actor, which will ensure that at most one request is done for each host at any time
There are two messages that the crawler actor can receive:
The Start
message should be sent only once, to kickstart the whole process. CrawlResult
messages will be sent by worker actors, once they have completed crawling the given URL and parsing the links.
Let’s start by looking at the crawlUrl
method in the actor:
The method checks if the URL has already been visited; if not, the visitedLinks
and inProgress
structures are updated. We create or lookup a worker actor using actorFor
, and tell it to Crawl
the given address.
Notice that when creating a new worker, we’re passing the self: ActorRef
reference so that the worker can send messages back to the crawler.
As we mentioned before, the actor can receive two types of messages:
The worker actors are expected to reply to the crawler actor with the CrawlResult
method. Once this message is received, again the inProgress
and referenceCount
structures are updated, and all the linked URLs crawled. If at the end there‘s nothing being crawled — we are done!
The worker isn’t complicated as well. It’s parametrised with a reference ( ActorRef
) to the master actor, which allows sending back messages, as well as the Http[Future]
interface and a way to parse links:
The internal state of the actor consists of a list of URLs that should be crawled ( urlsPending
), and a flag indicating if there’s a request in progress ( getInProgress
). This is needed to ensure that there’s at most one request to a given domain executing at any time.
There are also two messages which the worker will receive:
The first one is sent, as we’ve seen, by the crawler actor:
Once we get a new URL to crawl, we add it to the list of pending requests ( urlsPending
). If possible — that is, if there are no requests in progress — in the startHttpGetIfPossible
method we start executing a new HTTP request. Once this completes, we send a HttpGetResult
message to ourselves (the worker actor). Note that this is an asynchronous operation, and you always have to be cautious not to access or mutate the actor’s state from within such callbacks.
Once the worker actor receives the HttpGetResult
message, it sends a notification to the master with the results ( CrawlResult
), and starts another request, if there’s one pending.
Overall, it’s not a complicated process, but there’s some communication happening: both between the master and the worker, and the other way round. There are tests for the implementation (see AkkaCrawlerTest
), which verify that indeed we get correct answers.
Akka Typed
With Akka Typed, instead of writing actors directly, we’ll be defining actor behaviors. The messages sent between the actors/behaviors will be exactly the same, however we’ll additionaly encapsulate the whole state in a case class:
The behaviors are parametrised with an interface for executing HTTP requests, a function to parse the links and an actor to which the reply with the results should be sent once available (this used to be a Promise
in the previous example, but this way is more natural here):
The crawler behavior that we’ll define will use the actor’s context, so we’ll wrap the method which define the message-processing behavior with a factory method which obtains the context. The context, as everything in Akka Typed, is parameterised with the type of the messages that the actor handles. That’s needed for example to obtain a well-typed self
actor reference, which needs to know what kind of messages it accepts:
Let’s start from the end, with the method for looking up a worker actor for a given host. Since in this implementation we’re not using mutable state, but instead returning modified actor behaviors which wrap the state ( CrawlerData
), all methods will:
CrawlerData CrawlerData
If there’s no worker for the given domain yet, we’re spawning a new child actor, using the worker behavior, and returning an updated actor state, together with the created actor reference.
As the type of the behavior which we are passing to spawn
is Behavior[WorkerMessage]
, the result of this method will be ActorRef[WorkerMessage]
.
The crawl
and receive
method are quite similar to the “traditional” Akka implementation, with the significant difference being that we need to thread through the modified actor state — sometimes there’s a couple of modifications, hence we get a chain of data
, data2
, data3
references:
Communication in both Akka variants looks the same: we use the !
( tell
) method to send a message to a (typed) actor. Don’t be mistaken, though: here everything is well-typed . You won’t be able to send a message of an incorrect type to an actor.
The worker behavior also corresponds closely to what we’ve seen before, again with the exception that we’re not using mutable state (and hence there’s no possibility of accidentaly modifying it within callbacks):
ZIO
Once again, let’s leave the eager scala.concurrent.Future
world, and venture into the lazy land of IO
. In the example from the previous article we’ve been using an IOQueue
to communicate with the process from an outside world. Here, we’ll be using multiple IOQueue
s.
The Crawler
process will also use a CrawlerData
case class for storing the current state, but instead of a map from the domain to the worker’s ActorRef
, it will contain an IOQueue
:
Instead of actor classes, we’ll be defining methods, which will return IO
instances: descriptions of how to compute the host popularity counts. The method will take an Http[IO]
interface, but this time when executing the request, we won’t get a Future[String]
, but as we’re in ZIO-world, an IO[String]
. That is, we’ll get back a description of how to execute a GET
request to the given address:
In Akka Typed we had to define two behaviors for the crawler and the worker, here we’ll be defining two process descriptions. The first one, the crawler, contains the same parts as in the previous implementation:
Let’s again start from the bottom, with the description of how to obtain a worker for a given host. Even though we’ve travelled from Akka to Scalaz, we still need a way to ensure that there’s at most one request to a given host done at any given time. A separate asynchronous process which makes sure that’s the case is a good fit:
Here of course we also don’t have any mutable state, so we need to take in the CrawlerData
as a parameter, and return an updated copy. If there’s no worker for a given address yet, we first create a (bounded) queue which will be used to communicate with that worker, then create the worker process (we’ll get to the definition of worker
soon), and finally store the queue in our data structure. Again, that is not that different from the Akka Typed implementation.
The crawlUrl
method should look familiar as well:
The majo difference is that sending a message to a worker isn’t a side effecting operation as before. Instead, we use the workerQueue.offer
method, which returns a description of how to send a message to the queue. We need to combine this description with the overall description of how our code should run, or it will never be executed. Hence the need for the flatMap
/ map
.
The handleMessage
method corresponds to receive
from the Akka Typed implementation and should return the crawler data modified after handling a single, given message:
While before when handling the CrawlResult
message we did a simple foldLeft
on the resulting links, updating the data structure and running the side-effecting crawlUrl
method, here we need to combine all the IO
s returned by every crwalUrl
invocation into one big description. That’s what the foldlM
method does: def foldlM[G[_], B](z: B)(f: B => A => G[B])(implicit M: Monad[G]): G[B]
, giving us the final IO[CrawlerData]
which composes all side-effects into a single description.
But, that’s not the end! We have helper methods to handle the messages, but what about the main loop? Unlike in an actor, which as we’ve summarized before, is a pre-defined recipe for an asynchronous process reading messages from its inbox in a loop, here we need to create the loop by hand :
The loop takes the form of recursive invocations of the main crawler
method, with updated queue data. Unless of course, there are no more requests in progress: then we simply return the result.
Having the crawler ready, let’s look at the worker process. It can in fact be simpler than in the Akka implementations. The key observation is that we are in full control over when we take a new message from the queue. An actor has the mailbox-read-loop baked-in, we cannot wait with receiving the next message until some condition is satisified (it is possible to stash messages, but that requires additional logic). Here, however, we have that possibility.
Hence the worker, after getting a new request to crawl an URL from a queue, can simply execute the request and only take the next URL after the request completes:
The worker process is an infinite loop (created with forever
), which takes a message from the queue and handles it. It is also forked into a fiber, so that it runs asynchronously in the background. The fiber instance is returned, but it’s never used by the crawler process.
There’s a very important detail here, however. Notice that when we send the crawl result to the message queue, we fork the operation into a fiber ( crawlerQueue.offer(…).fork
). Why is that?
Recall that unlike the mailboxes of actors, the IOQueue
that we are using in ZIO is bounded, and when the queue is full, the offer
operation blocks. That’s good on one side — it gives a bound on memory usage , and also provides back-pressure. However, it can also lead to deadlocks .
In our example, imagine that there’s a lot of links from one page to a single host (but different paths), so we’ll be sending a lot of messages from the crawler process to a single worker process. If the number of links (URLs) is higher than the queue capacity, then at some point the crawler will become blocked and won’t be able to send any more URLs — as the queue will be full. The worker will slowly work through the requests, replying with results and processing messages from its queue — but it can get immediately filled up with new Crawl
messages.
If the total number of URLs sent from the crawler to a one worker during a single crawlUrl
invocation exceeds the combined capacities of the crawler and worker queues, at some point the crawler’s queue will fill up as well — as the crawler will be still sending Crawl
messages, and won’t get a chance to process the CrawlResult
messages it receives; now the worker will block as well. Hence the deadlock.
However, if we send the replies in the background — in a background fiber, the worker will be able to continue working through the Crawl
requests. All of the spawned offer(CrawlResult(...))
-fibers might wait blocking, until the crawler finishes enqueueing all Crawl
requests, but that’s not a problem.
That way our memory usage is still bounded (by the total size of the queues), and we won’t get a deadlock, however we need to carefully design the way the processes interact to avoid that situation.
If the processes form a hierarchy — as here, there’s a parent proces (crawler) and a number of children processes (worker), a good rule might be to directly send messages only from parent processes to child processes (down the hierarchy tree). Any replies — going up the hierarchy tree — should be sent in the background, using a forked fiber.
Finally, we need to bootstrap the whole process: create the queue to communicate with the crawler, enqueue the initial message, and create the IO
which describes the crawling process:
There’s one small but important feature here: the IO.supervise
call which wraps the whole process. What this method does is instruct the interpreter that when the wrapped computation completes ( crawl
), all fibers created by it should be interrupted (and terminated). And that’s exactly what we want: any forked worker fibers should be terminated once we have the final result, as they won’t be ever used.
This closely resembles a hierarchy of actors in Akka: once a parent actor is stopped, all child actors are stopped as well. In ZIO it’s not the default, but the option is there. When defining a computation which spawns multiple fibers, it’s very handy not to have to worry about the cleanup, but delegate the task to supervise
.
Monix
Finally, let’s move to Monix. As we noted in the previous installment of the series, Monix and ZIO solutions are very closely related. Here the situation is the same. There are two important differences however.
First of all, we cannot use MVar
s (which behave like bounded queues of size 1) to communicate between the crawler and the worker. As putting a value to a full Mvar
is a blocking operation it could very quickly lead to a deadlock (as described above).
That’s why we need a proper queue. Monix does have an unbounded async queue implementation, monix.execution.misc.AsyncQueue
, but it’s Future
-based, so we’ll create a thin Task
-wrapper around it:
The interface to our MQueue
is the same as to Scalaz’s IOQueue
, but with an important difference: IOQueue
is bounded, and when the queue is full, IOQueue.offer
will (asynchronously) block. Here we have an unbounded queue , which corresponds to unbounded actor mailboxes in Akka. Hence, we won’t have problems with deadlocks (but we also don’t get a bound on memory usage).
The second difference is that there’s no construct analogous to IO.supervise
in Monix, so we have to manage fibers manually. That means that we are storing the fibers in the CrawlerData
data structure, next to the worker queues:
When a new worker process is created, we have to store the fiber on which it is running:
And once the computation is done, all fibers need to be cancelled. This manual fiber management complicates slightly the Task
construction when we know that we are done with the crawling and want to return the result:
The data2.workers.values.map(_.fiber.cancel).toList.sequence_
creates a Task
description which cancels all the fibers ( Fiber.cancel: Task[Unit]
) in sequence, and then returns the final result.
Otherwise the code is very similar to the ZIO implementation. Here’s the full source for you to browse.
Both the Scalaz and Monix implementations come with tests which simulate deep and wide chains of crawled links. This way we can verify that the solutions are not only correct, but also stack-safe .
Sockets example
The repository also contains another example, called sockets . It shows how to deal with two common problems:
- interfacing with a legacy, blocking API. Here, we have a server socket (
Socket
) with a blocking and exception-throwingaccept
method, and client sockets (ConnectedSocket
) with blockingsend
/receive
methods. - broadcasting messages to a large number of clients. This is a common requirement e.g. when dealing with websockets
All examples use several processes:
- the router process (
Actor
/Behavior
/Task
/IO
) manages the server socket and broadcasts messages received from any connected client sockets to all other connected client sockets - the socket process accepts new client connections, which result in new instances of a
ConnectedSocket
- the client send/receive processes are created for each client
ConnectedSocket
and send message or listen for new ones
If at any time a SocketTerminatedException
is thrown by a client socket send/receive operation, the client socket needs to be closed and removed from the router.
The code is constructed in the same way as before, no significant new ideas are introduced. Still, it might be educational to explore the code on your own. As in the other examples, there’s also a test suite which might be useful for verifying that the code actually works.
Summary
In this part we’ve built on the ideas presented in the introductory article, adding communication to our asynchronous processes. As in the last part, the overall structure of the code for all of the different implementations isn’t that different . There are significant differences in type safety, the exact semantics of the constructed objects — but the way communication is performed, via asynchronous message passing — is the same.
It’s quite easy to identify how concepts from actors can be mirrored to the ZIO/Monix worlds. In Akka, each actor is associated with a mailbox : which is a queue of incoming messages. In Monix/ZIO, if we need to model communication, we need to create a queue .
While in Akka we pass around (typed or untyped) ActorRef
s, so that one actor can send messages to another actor, in ZIO we pass around (typed) IOQueue
s and in Monix MQueue
s or MVar
s, depending on the use-case. Again, this is not that different.
This gives us another piece of the answer tothe questions stated in the first part of the series: can ZIO/Monix offer an alternative to Akka/Akka Typed ? As far as state management and communication is involved: yes . Keep in mind however, that we are looking at a small portion of what Akka is: while things like remoting, clustering or persistence could be implemented using the ZIO/Monix approach as well, there are no libraries which implement these functionalities (at least yet).
In the final part, we’ll look at failure management, supervision and cancellation . Stay tuned!
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK