[Part 2] Testing Coroutines and Kotlin Flows

[Part 2] Testing Coroutines and Kotlin Flows

In Part 1 we looked at how to create a testing setup for Kotlin Coroutines. In this part, we will build on the base from Part 1 and look at how can we test Kotlin Flows.

Here's the link to Part 1 if you missed it: blog.shounakmulay.dev/part-1-testing-corout..

At its base, flow is a type that can return multiple values sequentially. If you are not familiar with flows, you can read more about them here.

Starter project

In Part 1 we build on top of the starter project and created a coroutines testing setup. If you want to code along with the article, you can clone the repo from https://github.com/shounakmulay/KotlinFlowTest and check out the part2-start branch.

Open the project in Android Studio. The branch part2-start has all the code implemented in Part 1 as well.

Even though we are looking at an Android project, these concepts will apply to any Kotlin based project.

Testing a simple Flow

A flow that only emits a few values and does not have any more operations applied to it, is the easiest to test.

In the MainViewModel we have a countFlow that points to a flow returned by the repository. This is a simple flow that counts from 1 to 3.

val countFlow = mainRepository.count3Flow()

In MainViewModelTest we have mocked the response of getCount3Flow. Since countFlow is a val on the view model, we need to mock the response before we initialize the MainViewModel. Otherwise the value is read before the mock is registered and it lead to a NullPointerException.

The setup function now looks like this

@Before
fun setUp() {
    mainRepository = mock()
    whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
    mainViewModel = MainViewModel(mainRepository, coroutineScope.dispatcherProvider)
}

And the getCount3Flow function just emits 3 numbers from the flow builder.

private fun getCount3Flow() = flow {
    (1..3).forEach {
        emit(it)
    }
}

The easiest way to test a flow is to convert it to a list. This internally collects the flow and return all the collected values as a list. Let's run the test named Given no error occurs, When count3Flow is called, Then it should emit all values correctly.

@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() = runBlocking {

    val countFlow = mainViewModel.countFlow

    assertEquals(listOf(1, 2, 3), countFlow.toList())
}

It passes!

Screenshot 2021-10-13 at 8.23.28 PM.png

Testing Complex Flows

Many times we need to move the heavy computation away from a particular thread / main thread.

To do that with flows, we use flowOn.

  • Let's modify the flow in the MainViewModel to use the Default dispatcher. Notice that we did not use the dispatcher provider that we created in Part 1 here.
val countFlow = mainRepository
        .count3Flow()
        .flowOn(Dispatchers.Default)
  • In the MainViewModel add one more flow that maps over the countFlow and doubles the value of each number. Let's use the IO dispatcher for this. Let's also use a delay to simulate some long running operation, or to represent a flow that emits values at a certain frequency.
val doubleCountFlow = countFlow.map {
        delay(2000)
    it * 2
}.flowOn(Dispatchers.IO)
  • Now let's zip these 2 flow into a flow that returns count to double pairs. The zip operator only emits a value when each flow emits one value.

Let's look at this with an example: If countFlow emits two values: 1 & 2 and doubleCountFlow has only emitted one value as of yet, that is 2. In this case our new countWithDoubleFlow will only emit 1 time with the value (1, 2). As soon as doubleCountFlow emits the second value, that is 4, countWithDoubleFlow will now have the 2nd values from both the flow and will emit its own second value: (2, 4).

Let use the Default dispatcher for this zipped flow as well.

val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
    count to double
}.flowOn(Dispatchers.Default)
  • Also update the test to use the new combined flow.
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
  runBlocking {

      val countFlow = mainViewModel.countWithDoubleFlow

      assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
  }
  • Now if we run the test, it will pass but the time it takes to run is almost 6 seconds.

Screenshot 2021-10-14 at 12.19.08 PM.png

  • That is a very long time for such a simple test.

    As we saw in part 1, we can use runBlockingTest to skip past delays and make the test run as fast as possible. Let's use that

@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
    runBlockingTest {

        val countFlow = mainViewModel.countWithDoubleFlow

        assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
    }
  • Running the test now gives an error saying This job has not completed yet The reason of this is, runBlockingTest tries to skip past delays by advancing time on the dispatcher. Since we have not injected the dispatchers, we are not using the test dispatcher in our test. Advancing time is only possible on a test dispatcher.

Screenshot 2021-10-14 at 12.29.47 PM.png

  • Replace the dispatchers with the dispatchers from the CoroutineDispatcherProvider for all of the flows
val countFlow = mainRepository
    .count3Flow()
    .flowOn(dispatcherProvider.default)

val doubleCountFlow = countFlow.map {
        delay(2000)
    it * 2
        }.flowOn(dispatcherProvider.io)

val countWithDoubleFlow = countFlow.zip(doubleCountFlow) { count, double ->
    count to double
        }.flowOn(dispatcherProvider.default)
  • We also need to tell the runBlockingTest to use the test dispatcher that we are injecting in tests. To do that first make the test dispatcher from CoroutineScopeRule public.
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
    val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
    var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
  • Then replace the runBlockingTest with coroutineScope.dispatcher.runBlockingTest so that we are using the correct dispatcher with runBlockingTest
@Test
fun `Given no error occurs, When count3Flow is called, Then it should emit all values correctly`() =
    coroutineScope.dispatcher.runBlockingTest {

        val countFlow = mainViewModel.countWithDoubleFlow

        assertEquals(listOf(1 to 2, 2 to 4, 3 to 6), countFlow.toList())
    }
  • Run the test and you will see that it passes and also takes much less time to execute. From approximately 6.7 seconds before to only around 600-700 milliseconds.

Screenshot 2021-10-14 at 12.41.56 PM.png

Cold vs Hot Flows

Cold flows are flows that do not produce values until they have at least 1 collector registered. As soon as the last collector leaves, cold flows stop producing.

Hot flows on the other hand do not wait for a collector to be registered and start producing right away. Also, hot flows do not stop producing once all the collectors leave.

Another major difference is that hot flows never complete. So calls to collect on a hot flow are never complete and are always suspended. Also, functions like toList will never complete when called on a hot flow.

Since hot flows never complete, we cannot test them by converting the values to a list. For testing hot flow and make testing any flow in general easier we can use a library called Turbine.

You can read more about turbine here.

Testing State Flow

State flow is a type of hot flow. It emits updates to a value, i.e. duplicate consecutive values are not emitted by a state flow.

For example, if we try to emit [10, 20, 20, 30] from a state flow, the collector of the flow would only receive [10, 20, 30].

If you are familiar with Android's LiveData, state flow is very similar to it. You can read more about state flow here.

  • Add a getStateFlow function to the MainViewModel
fun getStateFlow(): MutableStateFlow<Int> {
    return MutableStateFlow(10)
}

Here we are creating a normal flow and converting it to a state flow using the stateIn function.

  • Add a test for getStateFlow in MainViewModelTest
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
    val stateFlow = mainViewModel.getStateFlow()

    assertEquals(listOf(10), stateFlow.toList())
}
  • If you run this test, you will notice that it keeps running forever. This is because the state flow is a hot flow, and hot flows never complete.

Screenshot 2021-10-14 at 12.53.05 PM.png

To overcome this, we can use a library called Turbine. It is a small testing library for kotlin Flows. Let's add the turbine dependency in the app's build.gradle file.

dependencies {
  testImplementation 'app.cash.turbine:turbine:0.6.1'
}

The turbine library comes with a handy test extension function on flow that lets us test all types of flow with ease. It internally uses a Channel to collect the values form the flow and gives us useful functions to receive items one by one from the flow. Channels are out of scope of this article but you can read more about those here.

  • Let's update our test case to use this test function.
@Test
fun `When getStateFlow is called, it should emit values correctly`() = runBlocking {
    val stateFlow = mainViewModel.getStateFlow()

    stateFlow.test {
        val firstItem = awaitItem()
        assertEquals(10, firstItem)

        stateFlow.emit(20)
        val secondItem = awaitItem()
        assertEquals(20, secondItem)

        stateFlow.emit(20)
        expectNoEvents()
    }
}
  • The test function lets us await the next item in the flow and also expect various events. We can run assertions on each emit individually. This also makes the test code very clean and precise.

    Also notice that the last call to emit is with the same value, thus the state flow does not actually emit again. Therefore we can call expectNoEvents after that and the test passes.

    As of the time of writing this tutorial the turbine library internally uses kotlin time library which is still experimental. So we need to add the @ExperimentalTime annotation to the test class in order for it to compile.

@ExperimentalTime
class MainViewModelTest : BaseTest()
  • Now if we run the test, it passes!

Screenshot 2021-10-14 at 1.09.25 PM.png

Testing Shared Flow

Shared flow is another type of a hot flow. Unlike a state flow, it will emit all the values, even if the values are the same.

For example, if we emit [10, 20, 20, 30] from a shared flow, the collector would receive all the values [10, 20, 20, 30].

  • Let's create a shared flow in the MainViewModel form the doubleCountFlow. We can use the shareIn extension function to create a shared flow from an existing flow.
val doubleCountSharedFlow= doubleCountFlow
        .shareIn(viewModelScope, SharingStarted.Lazily)
  • We pass the viewModelScope as the scope in which the flow will be shared. The second argument is the SharingStarted class. It indicates the strategy by which the sharing should be started.

    For eg: Lazily means sharing is started when the first subscriber appears and never stops. While Eagerly means that sharing is started immediately.

    Similar to the state flow, calls to terminal functions like toList do not complete on shared flow and keep running forever.

  • Add a test for this shared flow in MainViewModelTest. We will use turbine for this test as well. We are also using runBlockingTest here because the doubleCountFlow uses a delay block.

@Test
fun `When countWithDoubleSharedFlow is called, it should emit values correctly`() =
    coroutineScope.dispatcher.runBlockingTest {
        val sharedFlow = mainViewModel.doubleCountSharedFlow

        sharedFlow.test() {
            val firstItem = awaitItem()
            assertEquals(2, firstItem)

            val secondItem = awaitItem()
            assertEquals(4, secondItem)

            val thirdItem = awaitItem()
            assertEquals(6, thirdItem)
        }
    }
  • This test passes as expected. Testing s lazily started shared flow is not much different that any normal hot flow.

Always inject SharingStarted strategy

There are situations where you would want your shared flow to start eagerly. Eagerly started shared flows can pose some challenges in terms of testing.

  • Let's change the SharingStarted strategy to Eagerly
val doubleCountSharedFlow =
        doubleCountFlow.shareIn(viewModelScope, SharingStarted.Eagerly)
  • If we run the test now, it will fail with a error Timed out waiting for 1000 ms. 1 sec is the default timeout period for turbine tests.

Screenshot 2021-10-14 at 1.22.33 PM.png

  • Why does the test fail? Since we are using Eagerly , the sharing of this shared flow is started immediately. That means by the time we start collecting the flow in our test, it has already emitted all the values.

    To fix this we need the sharing to be started Eagerly in real application but Lazily in tests. We can create a class similar to CoroutineDispatcherProvider that will provide us the sharing strategy. Then we can inject the sharing strategy wherever we need it.

  • Create a class SharingStrategyProvider

data class SharingStrategyProvider(
    val lazily: SharingStarted = SharingStarted.Lazily,
    val eagerly: SharingStarted = SharingStarted.Eagerly,
    val whileSubscribed: SharingStarted = SharingStarted.WhileSubscribed()
)
  • Add it as a dependency to the MainViewModel
class MainViewModel(
    private val mainRepository: MainRepository,
    private val dispatcherProvider: CoroutineDispatcherProvider,
    private val sharingStrategyProvider: SharingStrategyProvider
) : ViewModel()
  • In app/di/viewModelModule update the view model constructor
viewModel { MainViewModel(get(), CoroutineDispatcherProvider(), SharingStrategyProvider()) }
  • For tests, we can add SharingStrategyProvider to the already created CoroutineScopeRule
@ExperimentalCoroutinesApi
class CoroutineScopeRule(
    private val dispatcher: TestCoroutineDispatcher = TestCoroutineDispatcher(),
    var dispatcherProvider: CoroutineDispatcherProvider = CoroutineDispatcherProvider(),
    var sharingStrategyProvider: SharingStrategyProvider = SharingStrategyProvider()
): TestWatcher(), TestCoroutineScope by TestCoroutineScope(dispatcher)
  • Also update the starting function and use Lazily as the strategy everywhere.
override fun starting(description: Description?) {
    super.starting(description)
    Dispatchers.setMain(dispatcher)
    dispatcherProvider = CoroutineDispatcherProvider(
        main = dispatcher,
        default = dispatcher,
        io = dispatcher
    )
    sharingStrategyProvider = SharingStrategyProvider(
        lazily = SharingStarted.Lazily,
        eagerly = SharingStarted.Lazily,
        whileSubscribed = SharingStarted.Lazily
    )
}
  • Update the setUp function of the test class
@Before
fun setUp() {
    mainRepository = mock()
    whenever(mainRepository.count3Flow()).doReturn(getCount3Flow())
    mainViewModel = MainViewModel(
        mainRepository,
        coroutineScope.dispatcherProvider,
        coroutineScope.sharingStrategyProvider
    )
}
  • Lastly use the injected sharing strategy in the view model
val doubleCountSharedFlow =
        doubleCountFlow.shareIn(viewModelScope, sharingStrategyProvider.eagerly)
  • That's it. If you run the test now, it will pass as expected!

Screenshot 2021-10-14 at 1.28.33 PM.png

Where to go from here

Between Part 1 and Part 2, we have looked at how to deal with various situations that arise while testing coroutines and flows. You can now write tests for all types of coroutines and flows!

You can look at the complete code form this article at https://github.com/shounakmulay/KotlinFlowTest/tree/part2-end