Fandom Developers Wiki
Advertisement
Fandom Developers Wiki

This blogpost is a continuation of the previous post: Coroutines Flow, and goes deeper into coroutine streams. In this article, I’m going to explore hot streams called Channel. What is it? What does it do? How does it work? How to create and handle it? How does it compare with the cold streams? And finally, is there a place for both hot and cold streams? Let’s find out!

Transfering values

Transferring a single value between coroutines can be done by the Deferred object (the result of async job). But when you need to transfer a stream of values between multiple coroutines, there could be a place for channels. Channel easily allows to transfer a stream of values (very similar to BlockingQueue) without blocking a thread, by using suspending send and receive functions (or non suspending offer and poll). When a Channel is closed (by calling the close function) no more elements are coming in. To iterate a receiver through a stream of elements, a standard loop can be used.

fun sendAndReceive() = runBlocking {
    // create a channel
    // Channel class implements ReceiveChannel and SendChannel interfaces
    val channel = Channel<Int>()

    // run a coroutine
    launch {
        repeat(3) {
            // send a value to a channel
            channel.send(it) // it's suspend so wait here
        }
    }

    // 3 values have already sent to a channel, so they can be received

    repeat(3) {
        // receive a value from a channel
        val value = channel.receive() // it's suspend so wait here
    }
}

fun iterateChannel() = runBlocking {
    val channel = Channel<Int>()

    launch {
        repeat(3) {
            channel.send(it)
        }
        channel.close() // stop sending as soon as close token is received
    }

    // the channel was closed so a loop can iterate through finite number of values
    for (value in channel) {
        // do something with a value - 3 times
    }
    // without closing the channel, loop will never end
}

Producer-Consumer

The pattern where an object produces some values for another object which consumes those values is called producer-consumer. This pattern can be found quite often in concurrent code. Channels make an implementation of this pattern easy, by providing a special builder: produce (for producer side) and extension functions: consume, consumeEach (for consumer side).

fun producerConsumer() = runBlocking {
    // produce builder create a ReceiveChannel
    val producer: ReceiveChannel<Int> = produce {
        repeat(3) {
            send(it)
        }
    }
    
    // consumer
    producer.consumeEach { 
        // do something with a value - 3 times
    }
}

Pipeline

The pattern where an object produces an almost infinite stream of values and another object consumes, processes that stream and produces some result is called a pipeline. For coroutines, implementation of the pipeline pattern can be accomplished by channels.

// produce infinite (in terms of memory) stream of numbers
fun CoroutineScope.produceNumbers(
    startValue: Int = 1
) = produce<Int> {
    var counter = startValue
    while (true) {
        delay(25) // some time consuming work
        send(counter)
        counter++
    }
}

// consume, process infinite stream and resend it
fun CoroutineScope.primeNumbersFilter(
    numbers: ReceiveChannel<Int>, prime: Int
) = produce<Int> {
    for (number in numbers) {
        // filter the whole stream numbers divisible by prime
        if (number % prime != 0) {
            send(number)
        }
    }
}

fun pipeline() = runBlocking {
    // 1, 2, 3, 4, 5 ... etc to infinity
    var numbers = produceNumbers(startValue = 2)

    repeat(5) {
        val prime = numbers.receive()
        numbers = primeNumbersFilter(numbers, prime)
    }
    // filtered stream result:
    // prime = 2, filter: 3, 5, 7, 9, 11, 13 ...
    // prime = 3, filter: 5, 7, 11, 13, 17, 19 ...
    // prime = 5, filter: 7, 11, 13, 17, 19, 21 ...
    // prime = 7, filter: 11, 13, 17, 19, 23, 29 ...
    // prime = 11, filter: 13, 17, 19, 23, 29, 31 ...

    // cancel the whole coroutine to make finish possible
    coroutineContext.cancelChildren()
}

Fan

When talking about channels, it's impossible not to mention the concept of  fan. Channels don't have to be bounded as one producer to one consumer. Multiple coroutines can receive from the same channel (fan-out) and also multiple coroutines can send to the same channel (fan-in).

fun fanOut() = runBlocking<Unit> {
    // stream of numbers with 25ms delay
    val channel = produceNumbers()

    val coroutineA = launch {
        for (value in channel) {
            // receive and consume a value
            // coroutineB received: 1, 3, 5
        }
    }

    val coroutineB = launch {
        for (value in channel) {
            // receive and consume a value
            // coroutineB received: 2, 4
        }
    }

    // allow to receive five values
    delay(145)

    // cancel a producer coroutine, close its channel and stop iteration over the channel
    channel.cancel()
}

fun fanIn() = runBlocking<Unit> {
    val channel = Channel<String>()

    val coroutineA = launch {
        while (true) {
            channel.send("coroutineA")
            delay(10)
        }
    }

    val coroutineB = launch {
        while (true) {
            channel.send("coroutineB")
            delay(25)
        }
    }

    // receive some values from any sender
    repeat(6) {
        val value = channel.receive()

        // the result is:
        // 10ms - coroutineA
        // 20ms - coroutineA
        // 25ms - coroutineB
        // 30ms - coroutineA
        // 40ms - coroutineA
        // 50ms - coroutineB
    }

    coroutineContext.cancelChildren()
}

It's worth to note that channels respect the order of send and receive invocation. They follow FIFO (first in, first out) order, so the first coroutine which calls receive, gets the value.

fun fifoOrder() = runBlocking<Unit> {
    val channel = Channel<String>()

    val coroutineA = launch {
        while (true) {
            var value = channel.receive()
            value = "$value A"
            delay(10)
            channel.send(value)
        }
    }

    val coroutineB = launch {
        while (true) {
            var value = channel.receive()
            value = "$value B"
            delay(10)
            channel.send(value)
        }
    }

    channel.send("start")
    delay(50)
    val result = channel.receive()
    // result: start A B A B A

    // coroutineA was called first, so received element first
    // coroutineB was waiting, so received the second element first
    // even if coroutineA was also to allowed to do that immediately

    coroutineContext.cancelChildren()
}

Buffer

Channels as default have no buffer, which means an element is transferred when a sender and a receiver meet each other (just like a rendezvous). So, if send or receive is invoked first, then it's suspended until the opposite one is invoked. But actually, there are no objections to applying some buffering into channels. The easiest way to do that is simply pass the capacity argument to Channel factory or builder. Capacity can be a number - which means buffer size, or one of the const values like: RENDEZVOUS, UNLIMITED, CONFLATED, BUFFERED. A buffered channel allows you to send more elements before suspending.

fun differentChannels() = runBlocking<Unit> {
    // capacity = 1, RENDEZVOUS set as default, so they are the same
    val unbuffered = Channel<Unit>()
    val rendezvous = Channel<Unit>(RENDEZVOUS)

    // capacity = 2
    val explicitCapacity = Channel<Unit>(2)

    // buffers the most recent element
    val conflated = Channel<Unit>(CONFLATED)

    // unlimited buffer, in practice buffer as many as possible
    val unlimited = Channel<Unit>(UNLIMITED)

    // buffers with default capacity = 64
    val buffered = Channel<Unit>(BUFFERED)
}

fun unbufferedChannel() = runBlocking<Unit> {
    val channel = Channel<Int>()

    launch {
        repeat(4) {
            delay(10)
            channel.send(it)
            // a sender is faster than a receiver
            // so wait for the receiver to send new one
        }
    }

    repeat(4) {
        delay(25)
        val value = channel.receive()
    }

    // 10ms - send 0, 25ms receive 0
    // 35ms - send 1, 50ms receive 1
    // 60ms - send 2, 75ms receive 2
    // 85ms - send 3, 100ms receive 3

    coroutineContext.cancelChildren()
}

fun bufferedChannel() = runBlocking<Unit> {
    val channel = Channel<Int>(3)

    launch {
        repeat(4) {
            delay(10)
            channel.send(it)
            // a sender is faster than a receiver
            // so wait for the receiver to send new one
        }
    }

    repeat(4) {
        delay(25)
        val value = channel.receive()
    }

    // 10ms - send 0, 20ms - send 1
    // 25ms - receive 0
    // 30ms - send 2, 40ms - send 3
    // 50ms - receive 1
    // 75ms - receive 2
    // 100ms - receive 3

    coroutineContext.cancelChildren()
}

Hot and cold

It's not difficult to notice that Channel looks similar to Flow. So do we need both of them? What is the difference and when to use Channel or Flow? To start with, we should shed some light on hot and cold streams definitions. To see the differences between them, consider and answer for these questions: where and when values are produced, how many receivers simultaneously can read the values? 

Generally in cold streams, the elements are produced inside the stream. A cold stream has a single subscriber, which is responsible for initialization of stream emission. Every new subscriber triggers a new independent instance of stream execution. Cold streams start lazy which means the emission starts when subscription is called. For cold streams neither synchronization nor concurrency is needed, because they are sequential, non-blocking. Flows are cold!

On the other hand, in hot streams, elements are produced outside the stream, so they can come from external sources, and can live without a stream. A hot stream has zero or many subscribers, and emits elements to all of them at the same time. The same emission (and the only one) exists for each subscriber. Hot streams start immediately, it doesn't matter if any subscriber is attached. For hot streams synchronization is needed because different objects send and consume values. Channels are hot!

Fortunately, there is a way to convert Channel into Flow whenever you need it. There are some extension functions like consumeAsFlow or receiveAsFlow to help you with that.

fun consumeAsFlow() = runBlocking<Unit> {
    val channel = Channel<Int>(3)
    
    repeat(3) {
        channel.send(it)
    }
    
    channel
        .consumeAsFlow()
        .onEach {
            // consume a value from channel like you do from a flow
        }
        .launchIn(this)

    
    delay(10) // allow to start
    coroutineContext.cancelChildren()
}

Broadcast

Values sent in standard Channels can be consumed only by one receiver. After consumption, a value is removed and is not available for another receiver. If you want to allow multiple receivers to consume the same value, just create a channel by BroadcastChannel (ConflatedBroadcastChannel for single value capacity) factory and subscribe to it by openSubscription or convert it using asFlow. Buffer gets the value and helps synchronizing a sender and receivers.

fun broadcastChannel() = runBlocking {
    val broadcast = BroadcastChannel<Int>(3)
    val subscriberA = broadcast.openSubscription()
    val subscriberB = broadcast.openSubscription()
    
    repeat(3) {
        broadcast.send(it)
    }
    
    repeat(3) {
        subscriberA.receive()
        // gets: 0, 1, 2
    }
    repeat(3) {
        subscriberB.receive()
        // gets: 0, 1, 2
    }
   
    delay(100) // allow to start

    // cancel all subscribers - ReceiveChannel
    broadcast.cancel()
}

Hot Flow

If you are wondering whether Channels are the only way to use hot streams in coroutines - the answer is no! SharedFlow is an interface that extends Flow (so it’s a regular Flow and can behave like Flow). It's a kind of a representation of hot Flow, which means it exists independently of the collectors’ lifecycles. SharedFlow shares emitted values for all collectors in a broadcast way, so each collector gets all emitted values (taking into account the size of the buffer). To create SharedFlow use MutableSharedFlow builder and pass some optional parameters, e.g. replay.

fun sharedFlow() = runBlocking {
    val mutableSharedFlow = MutableSharedFlow<Int>(replay = 2) // kind of buffer
    val subscriberA = mutableSharedFlow.asSharedFlow() // SharedFlow
    val subscriberB = mutableSharedFlow.asSharedFlow()
    val subscriberC = mutableSharedFlow.asSharedFlow()

    subscriberA
        .takeWhile { it != 100 } // never completes so cancel it at some point
        .onEach {
            // gets: 1, 2
        }
        .launchIn(this)

    subscriberB
        .takeWhile { it != 200 }
        .onEach {
            // gets: 1, 2, 100, 3
        }
        .launchIn(this)

    mutableSharedFlow.tryEmit(1) // not suspending variant of emit
    mutableSharedFlow.emit(2)
    mutableSharedFlow.emit(100)
    mutableSharedFlow.emit(3)
    mutableSharedFlow.emit(200)

    subscriberC
        .takeWhile { it != 400 }
        .onEach {
            // gets: 3, 200, 4
            // 3 and 200 because of replay number
        }
        .launchIn(this)

    mutableSharedFlow.emit(4)
    mutableSharedFlow.emit(300)
}

On the other hand, there is also StateFlow that extends SharedFlow. So it has all features of StateFlow, but SharedFlow represents a read-only state with single updatable value. Just use MutableStateFlow builder to create it.

fun stateFlow() = runBlocking {
    val mutableStateFlow = MutableStateFlow(0) // initial value
    val subscriberA = mutableStateFlow.asStateFlow() // StateFlow
    val subscriberB = mutableStateFlow.asStateFlow()

    subscriberA
        .takeWhile { it != 100 }
        .onEach {
            // gets 0, 1
        }
        .launchIn(this)

    subscriberB
        .takeWhile { it != 100 }
        .onEach {
            // gets 0, 1
        }
        .launchIn(this)

    mutableStateFlow.emit(1)
    mutableStateFlow.value = 100
}

SharedFlow and StateFlow are conceptually similar to BroadcastChannel and ConflatedBroadcastChannel. However, their API is simpler and faster to use, they have read-only and mutable state separation and also cannot be closed or represent a failure. Actually, the BroadcastChannel family is obsolete and is going to be replaced by the SharedFlow family when it becomes stable.

Conclusion

In this blogpost, I have attempted to cover more details about Coroutines streams by introducing the hot streams - called Channels. I have presented how to create and manage them in cooperation with coroutines. Also, I have shed some light on the differences between the cold and hot streams, and have suggested how and when to choose between them. You should know from now on that there is a place in concurrent code for both. As you could have noticed, some parts of the Channel API are obsolete and are going to be replaced by Flow API. In fact, Coroutines are still under development so we can expect more changes in the future. This brings us to conclude that it’s always good to know what is under the hood and be familiar with the definitions of some concepts. But the most important thing is to know how to solve problems in the best possible way. So be mindful before making a decision, choose wisely and be aware of future changes.

Advertisement