20

Cold flows, hot channels

 3 years ago
source link: https://www.tuicool.com/articles/2yMJfeN
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

zymeqej.jpg!web

Markus Trienke, Sunset over drift ice

Asynchronous, long-running, or remote operations can be expressed using a future type, so a function returning a Value could be implemented as:

fun fooAsync(p: Params): CompletableFuture<Value> = 
CompletableFuture.supplyAsync { bar(p) }

When you call fooAsync(p) you get a promise to deliver a value in the future and there is an operation bar running in background to compute this value. Now you have to be careful not to lose a reference to this future, because this future is effectively a resource, like an open file. You must wait for it or cancel it if its value is no longer needed¹.

This is known as a hot data source. Unlike a regular function, that is active only during a call to it, a hot source is active even outside of the call to the corresponding function — it might have been active in background before the function was even called, and can still be active after the function was called, like we see here.

Suspending functions

Kotlin programming language provides support for suspending functions. The idiomatic way in Kotlin to represent this asynchronous operation and to avoid all pitfalls of programming with hot futures is:

suspend fun foo(p: Params): Value =
withContext(Dispatchers.Default) { bar(p) }

A caller of foo gets suspended while bar operation is in process. There is no need to worry about accidentally losing a reference to the working background operation and the code written using suspending functions looks familiar — like a regular, blocking, synchronous code. This foo function definition is cold — it is not doing anything before it is called and it will not do anything after returning a value.

Collection of values

What if an operation returns a collection of values? We can use a List type:

suspend fun foo(p: Params): List<Value> =
buildList² { while (hasMore) add(nextValue) }

This signature perfectly captures an API of a REST endpoint that returns a JSON array of values or a similar kind of RPC/RMI endpoint. It suspends its caller while operation is in process and returns the whole list at once.

Stream of values

But what if we are trying to represent a streaming API? That is, when values are arriving one by one, for example, as websocket messages, as a stream of GRPC messages, or via another streaming protocol like RSocket .

For synchronous streams Kotlin provides a Sequence data type:

fun foo(p: Params): Sequence<Value> =
sequence { while (hasMore) yield(nextValue) }

However, if you use a sequence as a return type to represent a streaming API, then waiting for incoming values must block caller’s thread . This is not good for UI apps and not good for scalable server-side code. For asynchronous programming we want to suspend a coroutine instead³.

Hot channels

We can use ReceiveChannel type from kotlinx.coroutines library to represent an asynchronous stream of values:

fun fooProducer(p: Params): ReceiveChannel<Value> =
GlobalScope.produce { while (hasMore) send(nextValue) }

However, we run into the same problem as with futures. The channel represents a hot stream of values. There is a coroutine on the other side of the channel that is working to produce the values, so we cannot just drop a reference to the ReceiveChannel , because the producer is going to be suspended forever waiting for a consumer, wasting memory resources, open network connections, etc.

Structured concurrency⁴ somewhat alleviates the problem. Observe that fooProducer launches a coroutine that works concurrently with the rest of the code. We can make this concurrency explicit⁵ by declaring fooProducer function as an extension on CoroutineScope :

fun CoroutineScope.fooProducer(p: Params): ReceiveChannel<Value> =
produce { while (hasMore) send(nextValue) }

However, it does not solve the problem completely — it just changes the effect of our bugs. Without structured concurrency lost channels are like lost futures — they produce silent resource leaks. With structured concurrency lost channels prevent completion of the outer coroutine scope, effectively “hanging” ongoing operations. The latter is a more obvious effect to notice during testing, but it is bad anyway. We still cannot write something like this:

val values: ReceiveChannel<Value> = fooProducer(p)
if (someCondition) return anotherResult // Oops! Leaked values
// ... do further work with values ...

All in all, working with channels is not as simple as working with single values using suspending functions or working with synchronous Sequence of values, and involves subtle problems and conventions due to concurrency⁶.

Channels are a great fit to model data sources that are intrinsically hot , data sources that exist without application’s requests for them: incoming network connections, event streams, etc.

Channels, just like futures, are synchronization primitives. You shall use a channel when you need to send data from one coroutine to another coroutine in the same or in a different process, because different coroutines are concurrent and you need synchronization to work with any data in presence of concurrency⁷. However, synchronization always comes at a performance cost.

Cold flows

But what if we don’t need either concurrency or synchronization, but need just non-blocking streams of data? We did not have a type for that until recently, so welcome Kotlin Flow type that is available for preview starting from kotlinx.coroutines version 1.2.0-alpha-2 :

fun foo(p: Params): Flow<Value> =
flow { while (hasMore) emit(nextValue) }

Just like a sequence, a flow represents a cold stream of values. Caller of foo gets a reference to the flow instance, but the code inside flow { ... } builder is not active, no resources are bound to it yet. Similar to sequences, flows can be transformed using various common operators like map , filter , etc. Unlike a sequence, a flow is asynchronous and allows suspending functions anywhere in its builder and operators. For example, the following code defines the flow of ten integers with 100 ms delay before each of them:

val ints: Flow<Int> = flow { 
for (i in 1..10) {
delay(100)
emit(i)
}
}

Terminal operators on a flow collect all values emitted by the flow, activating the flow code only for the duration of the corresponding operation. It makes the flow cold — it is not active before the call to terminal operation, not active after, releasing all resources before returning from the call. The most basic terminal operation is called collect . It is a suspending function that suspends the calling coroutine while the flow is being collected:

ints.collect { println(it) } // takes 1 second, prints 10 ints

Conclusion

The Flow is in preview and your feedback is welcome. We can still tweak API and implementation⁹. Unlike channels, flows do not inherently involve any concurrency. They are non-blocking, yet sequential. The goal of flows is to become for asynchronous data streams what suspending functions are for asynchronous operations — convenient, safe, easy to learn and easy to use.

It is too early to discuss the performance of flows, since no performance optimizations have been applied to the code yet, but they promise to considerably exceed the performance of synchronization primitives. I’ll explore these aspects in future updates. Stay tuned¹⁰.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK