Embracing Infinite Loops with Ruby and Polyphony
source link: https://noteflakes.com/articles/2021-10-14-embracing-infinite-loops
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
Embracing Infinite Loops with Ruby and Polyphony
14·10·2021
In this article I’ll discuss the use of infinite loops as a major construct when writing concurrent apps in Ruby using Polyphony. I’ll show how infinite loops differ from normal, finite ones; how they can be used to express long-running tasks in a concurrent environment; and how they can be stopped.
Polyphony is a library for writing highly concurrent Ruby apps. Polyphony harnesses Ruby fibers and a powerful io_uring-based I/O runtime to provide a solid foundation for building high-performance concurrent Ruby apps.
In the last few months I’ve been slowly transitioning from working on Polyphony-designing APIs and adding functionality-to using to develop actual applications, some of them open source, and others closed source production apps for my clients.
In the process of actually using Polyphony as the basis for writing concurrent apps, I’ve discovered some patterns that I’d like to share. It’s really fascinating how the design of an API can impact the patterns that emerge in the application code. Take for example loops.
Developers that are used to asynchronous APIs will probably find the idea of writing loops in your app code anathema to asynchronous design: there’s only one loop - the main event loop - and it is that loop which drives your code. You just provide callbacks to be called at the right moment.
By contrast, with Polyphony the app code is written in a sequential style, and it is the app code that is in control. There is no event loop. Instead, you can create any number of fibers, all executing concurrently, with each of those fibers proceeding independently of the others.
But loops come into play when you want to launch autonomous long-running tasks, like listening for incoming connections on a TCP socket, pulling items from a queue and processing them, or periodically running some background task. Infinite loops are what makes it possible to “fire-and-forget” those concurrent processes.
Loops are everywhere!
Loops are one of the most useful ways to control execution. Loops are used
anywhere you need to repeat an operation, and can be expressed in a variety of
ways, from the lowly GOTO
, through plain for
and while
loops, all the way
to Ruby’s elegant #each
and related methods, which take a block and apply it
to items from some iterable object. While those don’t necessarily look like
loops, they are, in fact, loops:
# this is a loop
while (item = queue.shift)
item.process
end
# this is also a loop
queue.each { |i| i.process }
Infinite loops
Inifinite loops are loops that run indefinitely. A loop can be inadvertently infinite if the loop logic is faulty, but loops can also be infinite by design. Infinite loops are made for running autonomous, long-lived tasks that can run any number of iterations, and are not meant to be stopped conditionally. Here are some examples:
# Accept incoming connections:
loop do
socket = server.accept
handle_client_connection(socket)
end
# Process items from a queue:
loop do
item = queue.shift
process(item)
end
As the example above shows, Ruby provides the very useful #loop
method which
lets us express infinite loops in a clear and concise manner. Looking at the
code we can immediately tell that we’re dealing with an infinite loop.
What’s important to note about infinite loops is that they can include a mechanism for breaking out of the loop if a certain condition is met. In fact, sometimes the distinction between a finite loop and an infinite one is not that clear.
Take for example a loop for handling an HTTP client connection. It needs to run
for the life time of the connection, which can last for any duration and for any
number of HTTP requests. In this case, this might look like an infinite loop,
but it will include a conditional break
:
# using h1p for parsing HTTP/1
def handle_client_connection(socket)
parser = H1P::Parser.new(socket)
loop do
headers = parser.parse_headers # returns nil when socket is closed
break unless headers
body = parser.read_body
handle_request(headers, body)
end
end
Another way to express the same logic, which makes it look like a normal finite loop, is like this:
def handle_client_connection(socket)
parser = H1P::Parser.new
while (headers = parser.parse_headers)
body = parser.read_body
handle_request(headers, body)
end
end
Concurrent infinite loops
What’s interesting about infinite loops is that once they start, theoretically
they will go on forever! In Polyphony you can start any number of infinite
loops, each running in its own fiber. Polyphony does the hard work of switching
between all those fibers, letting each fiber proceed at its own pace once the
operation it was waiting for has completed: reading from or writing to a socket,
waiting for an item to become available on a queue, etc. To do this, we use the
#spin
global method provided by Polyphony, which spins up new fibers:
item_processor = spin do
loop do
item = item_queue.shift
process(item)
end
end
http_server = spin do
server = TCPServer.new('0.0.0.0', 1234)
loop do
socket = server.accept
# each client runs in its own fiber
spin { handle_http_client(socket) }
end
end
Fiber.await(item_processor, http_server)
In the above example, we start a fiber for processing items from a queue, and along side it an HTTP server. Each of those is implemented using an infinite loop running on a separate fiber. Finally, the main fiber waits for those two fibers to terminate. While the main fiber waits, Polyphony takes care of running the item processor and the HTTP server, with each fiber proceeding at its own pace as items are pushed into the queue, and as incoming HTTP connections are being accepted.
Interrupting an infinite loop
As we saw above, starting an inifinite loop on a separate fiber is really easy, but how do you interrupt one? Polyphony provides us with some tools for interrupting fibers at any time. We can do that by scheduling the specific fiber with an exception, which might be a normal exception, or one of the special exceptions that Polyphony provides for controlling fibers.
In order to stop a fiber running an infinite loop, we can issue call
Fiber#stop
:
item_processor = spin do
loop do
item = item_queue.shift
process(item)
end
end
# tell the item_processor to stop
item_processor.stop
# then wait for it to terminate
item_processor.await
Under the hood, Fiber#stop
schedules the fiber with a Polyphony::MoveOn
exception, which means that the fiber should just terminate at the earliest
occasion, without the exception bubbling further up the fiber hierarchy.
As the example above shows, telling a fiber to stop does not mean it will do so
immediately. We also need to properly wait for it to terminate, which we do by
calling item_processor.await
or Fiber.await(item_processor)
. As discussed
above, stopping a fiber is done by scheduling it with a special exception that
tells it to terminate. The terminated fiber will then proceed to terminate any
child fibers it has, and perform other cleanup. This also means that you can use
normal ensure
blocks in order to perform cleanup. Let’s rewrite our item
processor to process items by sending them in JSON format over a TCP connection:
item_processor = spin do
soket = TCPSocket.new(PROCESSOR_HOSTNAME, PROCESSOR_PORT)
loop do
item = item_queue.shift
socket.puts item.to_json
end
ensure
socket.close
end
More ways to stop a fiber
In addition to the Fiber#stop
method, Polyphony has other APIs that can be
used to stop a fiber in a variety of ways, including by raising an exception in
the fiber’s context, and gracefully terminating a fiber. Termnating a fiber with
an exception is done using Fiber#raise
. This is especially useful when you
need to implement your own error states:
my_fiber.raise(FooError, 'bar')
A graceful termination can be done using Fiber#terminate
which takes an
optional boolean flag. This requires a bit more logic in the fiber itself:
item_processor = spin do
soket = TCPSocket.new(PROCESSOR_HOSTNAME, PROCESSOR_PORT)
loop do
item = item_queue.shift
socket.puts item.to_json
end
ensure
if Fiber.current.graceful_shutdown?
move_on_after(10) do
wait_for_inflight_items
end
end
socket.close
end
# terminate gracefully
item_processor.terminate(true)
In the example above, we added logic in the ensure block that waits up to 10 seconds for all inflight items to be processed, then proceeds with closing the TCP socket.
(We’ll take a closer look at exception handling and fiber termination in a future article.)
Polyphony 💛 Loops
Polyphony not only makes it easy to start and stop concurrent infinite loops, but it also further embraces loops by providing a bunch of loop APIs, including:
-
#spin_loop
- used for spinning up fibers that just run a loop. That way you can be even more concise when expressing infinite loops:item_processor = spin_loop { process(queue.shift) }
-
IO#read_loop
/IO#recv_loop
- used for reading from an IO instance (and provides even better performance, since it reads in a tighter loop):connection.read_loop { |data| process(data) }
-
IO#feed_loop
- used for feeding data read from an IO instance into a parser. Take for example a connection that uses MessagePack to pass messages:require 'msgpack' unpacker = MessagePack::Unpacker.new # #feed_loop takes a receiver and the method to call on each chunk of data connection.feed_loop(unpacker, :feed_each) { |msg| process(msg) } end
In the future Polyphony will include even more #xxx_loop
APIs that will
provide more concise ways to express loops along with better performance.
Polyphony is just plain Ruby
Looking at all the above examples, you will have noticed how the Polyphony API really looks baked into the Ruby language, as if it was part of the Ruby core. One of my principal design goals was to minimize boilerplate code when expressing concurrent operations. There’s no instantiating of special objects, no weird mechanisms for controlling fibers or rescuing exceptions. It just looks like plain Ruby! This makes it easier to both write and read concurrent code.
Conclusion
In this article I’ve showed you how infinite loops can be used to express long-running concurrent tasks using Polyphony. Polyphony provides all the tools needed for controlling the execution of concurrent fibers. For more information about Polyphony you can go to the Polyphony website. You can also browse the examples in the Polyphony repository.
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK