Testing Coroutines and Flows in Depth — runTest, TestDispatcher, Virtual Time, Turbine Patterns, and Production Recipes
Your test passed locally. Your CI pipeline shows green. Then production breaks because of a race condition you couldn’t reproduce. Sound familiar? Testing async code is hard — coroutines run on dispatchers, Flows emit over time, delay() calls take real seconds, and one missing advanceUntilIdle() can make a passing test silently skip half its work. kotlinx-coroutines-test and Turbine exist to make this reliable. They give you a virtual clock, predictable dispatchers, and clean APIs for asserting Flow emissions. This guide covers testing coroutines and Flows in depth — from runTest internals to production recipes that catch real bugs.
The Mental Model — Virtual Time
// The biggest lie in coroutine testing: time isn't real.
//
// In production:
// delay(5000) — suspends the coroutine for 5 actual seconds
//
// In a test using runTest:
// delay(5000) — suspends on a VIRTUAL clock
// — the test framework can SKIP that 5 seconds
// — runTest finishes in milliseconds
//
// Why? Because real-time tests are SLOW and FLAKY:
// - A test that waits 5s × 100 tests = 8 minutes of waiting
// - "Sometimes" the network mock takes 3s instead of 2s → flaky
//
// Virtual time fixes both:
// - Tests run in milliseconds even with delay(60_000)
// - Time advances ONLY when YOU say so — no race conditions
//
// Mental picture:
// ───────────────
// Real coroutines: [run]──[delay 5s]══════════════[resume]──[done]
// └─ 5 actual seconds pass
//
// Test coroutines: [run]──[delay 5s]══[advance]──[resume]──[done]
// └─ NO real time passes; test controls the clock
//
// runTest is the entry point that gives you this virtual clock
Setup
// build.gradle.kts
dependencies {
// The core testing library — runTest, TestDispatcher, virtual time
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.9.0")
// kotlinx-coroutines-test is a LIBRARY — provides:
// runTest, TestScope, StandardTestDispatcher, UnconfinedTestDispatcher,
// TestCoroutineScheduler, advanceUntilIdle, advanceTimeBy, runCurrent
// Turbine — testing Flow emissions
testImplementation("app.cash.turbine:turbine:1.1.0")
// turbine is a LIBRARY — .test {}, awaitItem(), expectNoEvents()
// JUnit 5 (or 4 — both work)
testImplementation("org.junit.jupiter:junit-jupiter:5.10.2")
// MockK — for mocking suspend functions with coEvery/coVerify
testImplementation("io.mockk:mockk:1.13.10")
}
// Most tests will look like this skeleton:
//
// @Test
// fun `something happens`() = runTest {
// // ... given/when/then with virtual time ...
// }
//
// runTest is a TOP-LEVEL FUNCTION from kotlinx-coroutines-test
// It creates a TestScope with a virtual clock and runs your test inside it
runTest — What It Actually Does
@Test
fun `delay does not block the test`() = runTest {
val start = currentTime
// currentTime is a PROPERTY on TestScope — the virtual clock in milliseconds
delay(10_000) // 10 seconds — on the virtual clock
val end = currentTime
assertEquals(10_000, end - start) // virtual time advanced by 10s
// Test completes in ~milliseconds of real time
}
// What runTest does under the hood:
// 1. Creates a TestScope (a CoroutineScope with a TestCoroutineScheduler)
// 2. Runs your test block as a coroutine inside that scope
// 3. After your block returns, calls advanceUntilIdle() automatically
// → runs any unfinished child coroutines to completion
// 4. Fails the test if any child coroutine threw an unhandled exception
// 5. Fails the test if any child coroutine is still running after timeout
// Why "auto advanceUntilIdle at the end"?
//
// @Test
// fun test() = runTest {
// launch { delay(1000); println("done") }
// // test block returns here, BUT the child launch is still running
// // runTest auto-advances time so the child finishes → "done" prints
// }
//
// Without this auto-advance, child coroutines would silently leak
// and tests would pass without testing anything
// runTest takes a timeout (default 60s) — if the test doesn't finish
// (e.g., infinite collect on a hot Flow), it fails with a clear error
TestDispatcher — Standard vs Unconfined
// kotlinx-coroutines-test ships TWO TestDispatcher implementations.
// They behave VERY differently — pick the right one for your test.
// ═══ StandardTestDispatcher — the QUEUEING dispatcher ═════════════════
//
// Coroutines launched on it are QUEUED, not started immediately.
// They only run when YOU advance the clock or call runCurrent().
//
// Use when: you want fine-grained control over WHEN coroutines run.
@Test
fun `standard dispatcher queues launches`() = runTest {
var value = 0
launch { value = 1 } // QUEUED — not run yet
assertEquals(0, value) // still 0!
runCurrent() // run currently scheduled coroutines (no time advance)
// runCurrent() is a FUNCTION on TestScope — runs pending tasks at current time
assertEquals(1, value)
}
// ═══ UnconfinedTestDispatcher — the EAGER dispatcher ═════════════════
//
// Coroutines run IMMEDIATELY on the calling thread until they hit a delay/suspend.
// More like "fire and continue" — predictable for simple tests.
//
// Use when: you want straightforward, no-surprises behaviour for ViewModel tests.
@Test
fun `unconfined dispatcher runs launches eagerly`() = runTest(UnconfinedTestDispatcher()) {
var value = 0
launch { value = 1 } // runs IMMEDIATELY
assertEquals(1, value) // already 1!
}
// ═══ DEFAULT in runTest ═════════════════════════════════════════════════
// Without arguments, runTest uses StandardTestDispatcher
// You can override it: runTest(UnconfinedTestDispatcher()) { ... }
// ═══ WHICH ONE SHOULD YOU USE? ══════════════════════════════════════════
//
// UnconfinedTestDispatcher → ViewModel tests, simple async logic,
// when you don't care about precise scheduling
//
// StandardTestDispatcher → testing race conditions, ordering,
// when you need to assert state BETWEEN steps
//
// Rule of thumb: start with Unconfined; switch to Standard if you need
// to test "this should happen BEFORE that"
advanceUntilIdle, advanceTimeBy, runCurrent
// THREE functions to control the virtual clock — each does something different.
// ═══ runCurrent() ═══════════════════════════════════════════════════════
// Runs tasks scheduled at the CURRENT virtual time. Does NOT advance time.
@Test
fun `runCurrent runs scheduled tasks without advancing clock`() = runTest {
var ran = false
launch { ran = true } // scheduled at currentTime = 0
runCurrent() // runs it — clock still at 0
assertEquals(true, ran)
assertEquals(0, currentTime)
}
// ═══ advanceTimeBy(millis) ═══════════════════════════════════════════════
// Advances the virtual clock by N ms and runs any tasks scheduled in that range.
// Tasks scheduled AT EXACTLY the new time are NOT run (off-by-one).
@Test
fun `advanceTimeBy moves the virtual clock`() = runTest {
var ran = false
launch {
delay(1000)
ran = true
}
advanceTimeBy(999)
assertEquals(false, ran) // 1ms short
advanceTimeBy(1)
runCurrent() // tasks at the boundary need a runCurrent
assertEquals(true, ran)
}
// ═══ advanceUntilIdle() ══════════════════════════════════════════════════
// Advances time as far as needed to run ALL pending coroutines to completion.
// The most commonly used helper — "run everything, I don't care how long it takes."
@Test
fun `advanceUntilIdle runs everything to completion`() = runTest {
var result = 0
launch {
delay(500)
result += 1
delay(2000)
result += 10
delay(10_000)
result += 100
}
advanceUntilIdle()
assertEquals(111, result) // all delays skipped, all increments ran
}
// ═══ MENTAL MODEL ════════════════════════════════════════════════════════
// runCurrent → "run what's ready RIGHT NOW"
// advanceTimeBy(N) → "fast-forward N ms, run anything that wakes up in that window"
// advanceUntilIdle → "skip ahead until nothing is left to do"
Replacing Dispatchers.Main
// ViewModelScope, lifecycleScope, and most Android UI code use Dispatchers.Main.
// In a JVM unit test, there is no Android Main thread — using it crashes.
//
// You MUST swap Dispatchers.Main with a TestDispatcher before each test.
class ArticleViewModelTest {
@OptIn(ExperimentalCoroutinesApi::class)
private val testDispatcher = UnconfinedTestDispatcher()
@BeforeEach
fun setup() {
Dispatchers.setMain(testDispatcher)
// setMain() is a FUNCTION from kotlinx-coroutines-test
// Replaces Dispatchers.Main with the given dispatcher
}
@AfterEach
fun tearDown() {
Dispatchers.resetMain()
// resetMain() — restores the original Main dispatcher
// Critical — without it, state leaks between test classes
}
@Test
fun `loadArticles updates state`() = runTest {
val viewModel = ArticleViewModel(fakeRepo)
viewModel.loadArticles()
advanceUntilIdle()
assertTrue(viewModel.uiState.value is UiState.Success)
}
}
// ═══ MainDispatcherRule — reusable JUnit rule for the boilerplate ════
class MainDispatcherRule(
val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) {
Dispatchers.setMain(testDispatcher)
}
override fun finished(description: Description) {
Dispatchers.resetMain()
}
}
// Usage:
class ArticleViewModelTest {
@get:Rule
val mainDispatcherRule = MainDispatcherRule()
// No more @BeforeEach/@AfterEach boilerplate
}
// For JUnit 5, use @ExtendWith(MainDispatcherExtension::class) instead
Sharing the Scheduler — Inject the TestDispatcher
// COMMON BUG: ViewModel uses one dispatcher; test uses another.
// Result: advanceUntilIdle() in the test does NOT advance the ViewModel's coroutine.
// ❌ Bug — ViewModel hard-codes Dispatchers.IO
class ArticleViewModel : ViewModel() {
fun load() = viewModelScope.launch(Dispatchers.IO) { /* ... */ }
}
@Test
fun test() = runTest {
val vm = ArticleViewModel()
vm.load()
advanceUntilIdle() // doesn't affect the IO coroutine!
// assertion fails — load hasn't finished
}
// ✅ Fix — INJECT the dispatcher
class ArticleViewModel(
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : ViewModel() {
fun load() = viewModelScope.launch(ioDispatcher) { /* ... */ }
}
// In the test, pass the test scheduler:
@Test
fun test() = runTest {
val vm = ArticleViewModel(ioDispatcher = StandardTestDispatcher(testScheduler))
// testScheduler is a PROPERTY on TestScope — the shared virtual clock
vm.load()
advanceUntilIdle() // NOW it controls the IO coroutine too
}
// ═══ THE KEY INSIGHT ═══════════════════════════════════════════════════
// All TestDispatchers built with the SAME testScheduler share the SAME virtual time.
// advanceUntilIdle() on the TestScope advances ALL of them at once.
//
// Production code rule: NEVER hard-code Dispatchers.IO/Default in your classes.
// ALWAYS inject them — you'll thank yourself when writing tests.
// ═══ DispatcherProvider pattern (for larger codebases) ════════════════
interface DispatcherProvider {
val main: CoroutineDispatcher
val io: CoroutineDispatcher
val default: CoroutineDispatcher
}
class DefaultDispatcherProvider : DispatcherProvider {
override val main = Dispatchers.Main
override val io = Dispatchers.IO
override val default = Dispatchers.Default
}
class TestDispatcherProvider(scheduler: TestCoroutineScheduler) : DispatcherProvider {
private val testDispatcher = StandardTestDispatcher(scheduler)
override val main = testDispatcher
override val io = testDispatcher
override val default = testDispatcher
}
// Inject DispatcherProvider into ViewModels — one swap covers all dispatchers
Testing Suspend Functions
// Suspend functions are tested INSIDE runTest { }.
class ArticleRepositoryTest {
private val fakeApi = FakeArticleApi()
private val fakeDao = FakeArticleDao()
private val repository = ArticleRepositoryImpl(fakeApi, fakeDao)
@Test
fun `refreshArticles saves API response to database`() = runTest {
fakeApi.articlesToReturn = listOf(
ArticleDto(id = "1", title = "Kotlin", content = "...", publishedAt = 0)
)
repository.refreshArticles()
assertEquals(1, fakeDao.insertedArticles.size)
assertEquals("Kotlin", fakeDao.insertedArticles[0].title)
}
@Test
fun `refreshArticles throws on network error`() = runTest {
fakeApi.shouldThrow = true
val exception = assertThrows<IOException> {
repository.refreshArticles()
}
assertEquals("Fake network error", exception.message)
}
// ═══ Testing timeout behaviour ═════════════════════════════════════
@Test
fun `getArticle times out after 5 seconds`() = runTest {
fakeApi.delayMillis = 10_000 // API takes 10s
assertThrows<TimeoutCancellationException> {
withTimeout(5_000) {
repository.getArticle("1")
}
}
// The 10s delay is virtual — test still finishes fast
}
}
Testing Cold Flows with Turbine
// Cold Flows produce values only when collected. Turbine collects and lets you
// assert each emission one by one.
class SearchRepositoryTest {
@Test
fun `searchFlow emits results after typing`() = runTest {
val repository = SearchRepository(fakeApi)
repository.searchFlow("kotlin").test {
// .test {} is an EXTENSION FUNCTION on Flow from Turbine
// It collects the Flow and exposes awaitItem()/awaitComplete()/awaitError()
assertEquals(emptyList<Article>(), awaitItem())
// awaitItem() — suspend until the next emission, return it
assertEquals(listOf(article1, article2), awaitItem())
awaitComplete()
// awaitComplete() — suspend until the Flow completes
}
}
@Test
fun `searchFlow emits error on api failure`() = runTest {
fakeApi.shouldThrow = true
repository.searchFlow("kotlin").test {
val error = awaitError()
// awaitError() — suspend until the Flow errors
assertTrue(error is IOException)
}
}
@Test
fun `searchFlow emits no items for empty query`() = runTest {
repository.searchFlow("").test {
expectNoEvents()
// expectNoEvents() — assert nothing has been emitted YET
// (Doesn't wait — checks RIGHT NOW)
cancelAndIgnoreRemainingEvents()
}
}
}
// ═══ Turbine API cheat sheet ═══════════════════════════════════════════
// awaitItem() → next value, fails if Flow ended/errored
// awaitComplete() → assert Flow completed normally
// awaitError() → assert Flow errored, returns Throwable
// expectNoEvents() → assert no emissions right now
// expectMostRecentItem() → skip to last emission, return it
// skipItems(n) → ignore n emissions
// cancel() → cancel collection
// cancelAndIgnoreRemainingEvents() → clean up without asserting
Testing Hot Flows — StateFlow and SharedFlow
// Hot Flows (StateFlow, SharedFlow) are different beasts:
// - They emit even with no collectors
// - StateFlow ALWAYS replays the latest value to new collectors
// - SharedFlow has configurable replay (0 by default)
//
// This changes how you test them.
// ═══ StateFlow — always has a current value ═════════════════════════
class CounterViewModel {
private val _count = MutableStateFlow(0)
val count: StateFlow<Int> = _count.asStateFlow()
fun increment() { _count.value++ }
}
@Test
fun `count increments`() = runTest {
val vm = CounterViewModel()
vm.count.test {
assertEquals(0, awaitItem()) // initial value — ALWAYS emitted to new collectors
vm.increment()
assertEquals(1, awaitItem())
vm.increment()
vm.increment()
// Two updates — but conflation may collapse them
// StateFlow CONFLATES: only the latest value is guaranteed
assertEquals(3, expectMostRecentItem())
cancelAndIgnoreRemainingEvents()
}
}
// ⚠️ STATEFLOW GOTCHA — conflation
// StateFlow conflates rapid updates. If you do .value = 1, .value = 2, .value = 3
// quickly, a slow collector may only see 3 (not 1, 2, 3).
// Don't assert exact intermediate values — use expectMostRecentItem().
// ═══ SharedFlow — configurable replay ════════════════════════════════
class EventBus {
private val _events = MutableSharedFlow<Event>(replay = 0)
val events: SharedFlow<Event> = _events.asSharedFlow()
suspend fun emit(event: Event) = _events.emit(event)
}
@Test
fun `events flow emits to subscribers`() = runTest {
val bus = EventBus()
bus.events.test {
// ⚠️ With replay = 0, no initial item arrives
expectNoEvents()
// Need to launch the emit so it doesn't block on no-collectors
// (test {} subscribed first, so emit will succeed)
bus.emit(Event.Click)
assertEquals(Event.Click, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
// ⚠️ SHAREDFLOW GOTCHA — subscription timing
// MutableSharedFlow.emit() suspends if there are no subscribers AND replay = 0.
// If you emit BEFORE Turbine's .test {} subscribes, the event is LOST.
//
// Fix: subscribe first (.test {}), then emit inside the test block.
// ═══ Testing stateIn / shareIn ═════════════════════════════════════════
@Test
fun `stateIn caches the latest value`() = runTest {
val flow = flowOf(1, 2, 3).stateIn(
scope = backgroundScope,
// backgroundScope is a PROPERTY on TestScope — auto-cancelled at test end
// Use it for stateIn/shareIn so the test cleans up properly
started = SharingStarted.Eagerly,
initialValue = 0
)
flow.test {
assertEquals(3, expectMostRecentItem()) // Eager + flowOf collapses to last
cancelAndIgnoreRemainingEvents()
}
}
Testing Flow Operators — debounce, combine, flatMapLatest
// Operators that involve TIME need careful virtual-clock handling.
// ═══ debounce ══════════════════════════════════════════════════════════
@Test
fun `search query debounces by 300ms`() = runTest {
val viewModel = SearchViewModel(fakeRepo)
viewModel.results.test {
skipItems(1) // skip initial empty state
viewModel.onQueryChanged("k")
viewModel.onQueryChanged("ko")
viewModel.onQueryChanged("kot")
// Three rapid changes — debounce should collapse them
expectNoEvents() // nothing emitted yet — still inside debounce window
advanceTimeBy(299)
expectNoEvents() // still 1ms short
advanceTimeBy(1)
runCurrent()
// Now debounce fires — only the last query "kot" hits the API
val results = awaitItem()
assertTrue(results.isNotEmpty())
cancelAndIgnoreRemainingEvents()
}
}
// ═══ combine ═══════════════════════════════════════════════════════════
@Test
fun `combine emits when both flows have a value`() = runTest {
val a = MutableStateFlow(0)
val b = MutableStateFlow("")
val combined = combine(a, b) { i, s -> "$s-$i" }
combined.test {
assertEquals("-0", awaitItem()) // initial values from both StateFlows
a.value = 1
assertEquals("-1", awaitItem())
b.value = "x"
assertEquals("x-1", awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
// ═══ flatMapLatest ═════════════════════════════════════════════════════
@Test
fun `flatMapLatest cancels previous flow`() = runTest {
val trigger = MutableStateFlow(0)
var slowEmissions = 0
var fastEmissions = 0
val result = trigger.flatMapLatest { i ->
flow {
if (i == 0) {
delay(1000) // slow
slowEmissions++
emit("slow-$i")
} else {
delay(10) // fast
fastEmissions++
emit("fast-$i")
}
}
}
result.test {
advanceTimeBy(500)
trigger.value = 1 // cancels the slow flow before it can emit
advanceUntilIdle()
assertEquals("fast-1", awaitItem())
assertEquals(0, slowEmissions) // slow was cancelled
assertEquals(1, fastEmissions)
cancelAndIgnoreRemainingEvents()
}
}
Testing Cancellation
// Cancellation bugs are subtle. Test them explicitly.
@Test
fun `repository cancels in-flight request when new one starts`() = runTest {
val api = FakeArticleApi(delayMillis = 5000)
val repo = ArticleRepository(api)
var firstResult: List<Article>? = null
var secondResult: List<Article>? = null
val firstJob = launch { firstResult = repo.search("kotlin") }
advanceTimeBy(1000)
val secondJob = launch { secondResult = repo.search("compose") }
firstJob.cancel()
advanceUntilIdle()
assertNull(firstResult) // never completed
assertNotNull(secondResult) // completed normally
assertTrue(firstJob.isCancelled)
}
// ═══ The CancellationException trap ════════════════════════════════════
// If your production code catches Exception (which catches CancellationException),
// the coroutine BECOMES uncancellable. Tests should catch this!
class BuggyViewModel(private val repo: ArticleRepository) {
private val _state = MutableStateFlow<UiState>(UiState.Loading)
val state = _state.asStateFlow()
fun load(scope: CoroutineScope) = scope.launch {
try {
_state.value = UiState.Success(repo.getArticles())
} catch (e: Exception) { // ❌ catches CancellationException too!
_state.value = UiState.Error(e.message ?: "")
}
}
}
@Test
fun `viewModel respects cancellation`() = runTest {
val vm = BuggyViewModel(slowRepo)
val job = vm.load(this)
advanceTimeBy(100)
job.cancel()
advanceUntilIdle()
// ❌ With the bug, state ends up as Error("...")
// ✅ Correct behaviour: state stays Loading (cancelled before completion)
assertEquals(UiState.Loading, vm.state.value)
assertTrue(job.isCancelled)
}
// Production fix:
// catch (e: CancellationException) { throw e }
// catch (e: Exception) { /* handle */ }
Testing Exception Handling
// runTest fails the test if an unhandled exception escapes.
// SupervisorJob and exception handlers change this.
// ═══ Unhandled exceptions FAIL the test ═══════════════════════════════
@Test
fun `unhandled exception fails the test`() = runTest {
launch {
throw IllegalStateException("boom")
}
// runTest's auto-advance runs the launch → exception propagates → test fails
}
// ═══ Use assertThrows for expected failures ═══════════════════════════
@Test
fun `getArticle throws on bad id`() = runTest {
assertThrows<IllegalArgumentException> {
repository.getArticle("")
}
}
// ═══ Testing CoroutineExceptionHandler ════════════════════════════════
@Test
fun `exception handler catches errors in supervised scope`() = runTest {
val caught = mutableListOf<Throwable>()
val handler = CoroutineExceptionHandler { _, e -> caught.add(e) }
val scope = CoroutineScope(SupervisorJob() + handler + StandardTestDispatcher(testScheduler))
scope.launch { throw RuntimeException("boom") }
advanceUntilIdle()
assertEquals(1, caught.size)
assertEquals("boom", caught[0].message)
}
// ═══ SupervisorJob — failure isolation ═════════════════════════════
@Test
fun `child failure does not cancel siblings under SupervisorJob`() = runTest {
var sibling1Done = false
var sibling2Done = false
val supervisor = SupervisorJob()
val scope = CoroutineScope(supervisor + StandardTestDispatcher(testScheduler))
scope.launch(CoroutineExceptionHandler { _, _ -> }) {
throw RuntimeException("boom")
}
scope.launch {
delay(100)
sibling1Done = true
}
scope.launch {
delay(200)
sibling2Done = true
}
advanceUntilIdle()
assertTrue(sibling1Done) // siblings unaffected
assertTrue(sibling2Done)
}
Common Mistakes to Avoid
Mistake 1: Forgetting advanceUntilIdle()
// ❌ With StandardTestDispatcher, the launched coroutine never runs!
@Test
fun test() = runTest { // default = StandardTestDispatcher
val vm = ArticleViewModel(fakeRepo)
vm.loadArticles()
assertTrue(vm.uiState.value is UiState.Success) // 💥 still Loading!
}
// ✅ Advance the clock so the coroutine actually runs
@Test
fun test() = runTest {
val vm = ArticleViewModel(fakeRepo)
vm.loadArticles()
advanceUntilIdle()
assertTrue(vm.uiState.value is UiState.Success)
}
Mistake 2: Using runBlocking instead of runTest
// ❌ runBlocking uses REAL time — delay(60_000) waits 60 actual seconds
@Test
fun test() = runBlocking {
delay(60_000) // test takes a full minute!
}
// ✅ runTest uses virtual time — finishes in milliseconds
@Test
fun test() = runTest {
delay(60_000) // virtual — instant
}
// runBlocking has its place (rarely — testing real-time behaviour),
// but for almost every test, use runTest.
Mistake 3: Mismatched dispatchers
// ❌ ViewModel uses Dispatchers.IO, test uses TestScope
class ArticleViewModel : ViewModel() {
fun load() = viewModelScope.launch(Dispatchers.IO) { /* ... */ }
}
@Test
fun test() = runTest {
val vm = ArticleViewModel()
vm.load()
advanceUntilIdle() // doesn't reach the IO coroutine!
}
// ✅ Inject the dispatcher; pass a TestDispatcher in tests
class ArticleViewModel(
private val ioDispatcher: CoroutineDispatcher = Dispatchers.IO
) : ViewModel()
Mistake 4: Asserting Flow emissions without Turbine
// ❌ Manual collect with toList() — brittle and unclear
@Test
fun test() = runTest {
val emissions = mutableListOf<UiState>()
val job = launch { vm.uiState.toList(emissions) } // collects forever!
vm.load()
advanceUntilIdle()
job.cancel()
assertEquals(2, emissions.size)
}
// ✅ Turbine — clean and explicit
@Test
fun test() = runTest {
vm.uiState.test {
assertEquals(UiState.Loading, awaitItem())
vm.load()
assertTrue(awaitItem() is UiState.Success)
cancelAndIgnoreRemainingEvents()
}
}
Mistake 5: Emitting to SharedFlow before subscribers exist
// ❌ Event is lost — emit happens before .test {} subscribes
@Test
fun test() = runTest {
val bus = EventBus()
bus.emit(Event.Click) // no subscribers — lost (replay = 0)
bus.events.test {
expectNoEvents() // event was already lost!
}
}
// ✅ Subscribe first, then emit
@Test
fun test() = runTest {
val bus = EventBus()
bus.events.test {
bus.emit(Event.Click)
assertEquals(Event.Click, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
Mistake 6: Leaking coroutines in stateIn / shareIn tests
// ❌ Using a regular CoroutineScope — leaks past test end
@Test
fun test() = runTest {
val scope = CoroutineScope(SupervisorJob()) // not auto-cancelled!
val state = flow.stateIn(scope, SharingStarted.Eagerly, 0)
// ...
} // scope still alive — runTest may complain
// ✅ Use backgroundScope — auto-cancelled when runTest finishes
@Test
fun test() = runTest {
val state = flow.stateIn(backgroundScope, SharingStarted.Eagerly, 0)
// backgroundScope is a PROPERTY on TestScope — built for exactly this
}
Production Recipes
// ═══ Recipe 1: Reusable MainDispatcherRule ═════════════════════════════
// One rule, drop into any test class. No more setMain/resetMain boilerplate.
class MainDispatcherRule(
val testDispatcher: TestDispatcher = UnconfinedTestDispatcher()
) : TestWatcher() {
override fun starting(description: Description) = Dispatchers.setMain(testDispatcher)
override fun finished(description: Description) = Dispatchers.resetMain()
}
// ═══ Recipe 2: Helper for collecting StateFlow values ═════════════════
// When you don't need Turbine's full API and just want the latest value.
@OptIn(ExperimentalCoroutinesApi::class)
fun <T> StateFlow<T>.collectInBackground(scope: CoroutineScope) {
scope.launch(UnconfinedTestDispatcher(scope.coroutineContext[TestCoroutineScheduler]!!)) {
collect {}
}
// Forces the StateFlow to be "active" so it computes derived values
// Useful for testing stateIn(scope, WhileSubscribed(...), ...)
}
// Usage:
@Test
fun test() = runTest {
vm.derivedState.collectInBackground(backgroundScope)
vm.load()
advanceUntilIdle()
assertEquals(expected, vm.derivedState.value)
}
// ═══ Recipe 3: Testing retry logic ═════════════════════════════════════
@Test
fun `retries 3 times before giving up`() = runTest {
var attempts = 0
val flowWithRetry = flow<Int> {
attempts++
throw IOException("network")
}.retry(3)
flowWithRetry.test {
awaitError()
assertEquals(4, attempts) // 1 original + 3 retries
}
}
// ═══ Recipe 4: Testing UI events as a Channel/SharedFlow ══════════════
class MyViewModel {
private val _events = Channel<Event>(Channel.BUFFERED)
val events = _events.receiveAsFlow()
fun onClick() = viewModelScope.launch { _events.send(Event.Clicked) }
}
@Test
fun `click sends Clicked event`() = runTest {
val vm = MyViewModel()
vm.events.test {
vm.onClick()
assertEquals(Event.Clicked, awaitItem())
cancelAndIgnoreRemainingEvents()
}
}
// ═══ Recipe 5: Testing combined operators with debounce ═══════════════
@Test
fun `search combines query and filter with debounce`() = runTest {
val query = MutableStateFlow("")
val filter = MutableStateFlow(Filter.All)
val results = combine(query.debounce(300), filter) { q, f ->
repo.search(q, f)
}
results.test {
skipItems(1) // initial combine emission
query.value = "kotlin"
advanceTimeBy(299)
expectNoEvents()
advanceTimeBy(1)
runCurrent()
assertTrue(awaitItem().isNotEmpty())
cancelAndIgnoreRemainingEvents()
}
}
Summary
runTest(top-level function from kotlinx-coroutines-test) is the entry point — gives you a virtual clock and auto-runs pending child coroutines at the end- Virtual time means
delay(60_000)finishes instantly — tests are fast and deterministic - StandardTestDispatcher queues launches; you advance the clock manually — use for testing ordering and races
- UnconfinedTestDispatcher runs launches eagerly — use for simple ViewModel tests where you just want things to run
- Always
Dispatchers.setMain(testDispatcher)in@Before,Dispatchers.resetMain()in@After— or use aMainDispatcherRule - Inject dispatchers into ViewModels/Repositories — never hard-code
Dispatchers.IO, or your tests can’t control them - Build TestDispatchers with the shared
testSchedulersoadvanceUntilIdle()controls everything runCurrent()runs ready tasks without advancing time;advanceTimeBy(N)advances by N ms;advanceUntilIdle()runs everything- Use Turbine for Flow assertions:
.test { awaitItem() },awaitError(),expectNoEvents(),cancelAndIgnoreRemainingEvents() - StateFlow conflates rapid updates — use
expectMostRecentItem(), don’t assert exact intermediate values - SharedFlow with
replay = 0drops emissions if no subscribers — subscribe via.test {}first, then emit - Use
backgroundScope(property on TestScope) forstateIn/shareIn— auto-cancelled at test end - Test cancellation explicitly — the
catch (e: Exception)bug silently swallowsCancellationExceptionand breaks coroutine cooperation - For time-based operators (
debounce,throttle), advance the clock past the threshold and callrunCurrent()to fire the boundary tick - Use
flatMapLatesttests to verify previous flows are cancelled — assert side-effect counters on the cancelled branch - Prefer
runTestoverrunBlocking— you almost never want real-time tests
Async testing has the steepest learning curve in Android development — but once you internalize virtual time, dispatcher injection, and Turbine, every coroutine bug becomes catchable in a unit test. Start by adding a MainDispatcherRule to your test classes, inject dispatchers into every ViewModel, and use Turbine for every Flow assertion. Your tests will be faster, your bugs will be louder, and your refactors will stop breaking production.
Happy coding!
Comments (0)
Sign in to leave a comment.
No comments yet. Be the first to share your thoughts.