Kotlin Flow is how you handle streams of data in coroutines. While a regular suspending function returns a single value, a Flow emits multiple values over time — think search results arriving one by one, real-time sensor readings, or a stream of database updates. If you’ve used RxJava before, Flow is Kotlin’s native, coroutine-based replacement — simpler, lighter, and built on structured concurrency. This guide covers what Flow is, how to create and collect it, and how it fits into real Android apps.


What is a Flow?

A Flow is a cold, asynchronous stream that emits values sequentially. “Cold” means it doesn’t produce values until someone starts collecting it — just like a sequence doesn’t evaluate until you iterate over it.

// A simple Flow that emits three numbers
val numberFlow: Flow<Int> = flow {
    emit(1)          // send value downstream
    delay(1000)      // simulate async work
    emit(2)
    delay(1000)
    emit(3)
}

// Nothing happens until you collect
numberFlow.collect { value ->
    println(value)   // prints 1, then 2, then 3
}

Key points about Flow:

  • Cold — code inside flow { } doesn’t run until collect is called
  • Sequential — values are emitted one at a time, in order
  • Cancellable — collection respects coroutine cancellation automatically
  • Suspending — both emit() and collect() are suspending functions

Creating Flows

flow { } builder — the most common way

// Emit values from a long-running operation
fun fetchArticles(): Flow<Article> = flow {
    val articles = api.getArticles()      // suspend call
    for (article in articles) {
        emit(article)                      // emit each article one by one
    }
}

// Emit values over time — like a ticker
fun tickerFlow(intervalMs: Long): Flow<Long> = flow {
    var count = 0L
    while (true) {
        emit(count++)
        delay(intervalMs)
    }
}

flowOf() — from fixed values

// Create a Flow from known values — like listOf() for streams
val greetings = flowOf("Hello", "Bonjour", "Hola")

greetings.collect { println(it) }
// Hello
// Bonjour
// Hola

asFlow() — convert collections and sequences

// Convert a list to a Flow
val listFlow = listOf(1, 2, 3).asFlow()

// Convert a range
val rangeFlow = (1..10).asFlow()

// Convert a sequence
val sequenceFlow = generateSequence(1) { it * 2 }
    .take(5)
    .asFlow()   // emits 1, 2, 4, 8, 16

callbackFlow — bridge callback-based APIs

// Convert a callback-based API to Flow
fun locationUpdates(): Flow<Location> = callbackFlow {
    val callback = object : LocationCallback() {
        override fun onLocationResult(result: LocationResult) {
            trySend(result.lastLocation)   // send from callback into Flow
        }
    }

    locationClient.requestLocationUpdates(request, callback, Looper.getMainLooper())

    // awaitClose is called when the Flow collector cancels
    awaitClose {
        locationClient.removeLocationUpdates(callback)   // cleanup
    }
}

Collecting Flows

collect is the primary way to receive values from a Flow. It’s a suspending function, so it must be called from a coroutine:

// Basic collection
viewModelScope.launch {
    repository.getArticles().collect { article ->
        println(article.title)
    }
}

// collect is terminal — it suspends until the Flow completes
// Code after collect runs only after the Flow finishes emitting

Other terminal operators

val numbers = flowOf(1, 2, 3, 4, 5)

// first() — collect only the first value
val first = numbers.first()           // 1

// first { } — first value matching a condition
val firstEven = numbers.first { it % 2 == 0 }   // 2

// toList() — collect all values into a List
val list = numbers.toList()           // [1, 2, 3, 4, 5]

// toSet() — collect into a Set
val set = numbers.toSet()             // {1, 2, 3, 4, 5}

// single() — expects exactly one value
val single = flowOf(42).single()      // 42

// reduce — accumulate values
val sum = numbers.reduce { acc, value -> acc + value }   // 15

// fold — accumulate with initial value
val sumPlus100 = numbers.fold(100) { acc, value -> acc + value }   // 115

// count
val count = numbers.count()           // 5

Flow is Cold — What That Means

Every time you call collect, the flow { } block runs from the start. Each collector gets its own independent execution:

val timestampFlow = flow {
    println("Flow started")
    emit(System.currentTimeMillis())
    delay(1000)
    emit(System.currentTimeMillis())
}

// First collection
timestampFlow.collect { println("Collector 1: $it") }

// Second collection — runs the flow block again from scratch
timestampFlow.collect { println("Collector 2: $it") }

// Output:
// Flow started
// Collector 1: 1700000000000
// Collector 1: 1700000001000
// Flow started                      <— runs again!
// Collector 2: 1700000001500
// Collector 2: 1700000002500

This is fundamentally different from hot streams (like StateFlow or SharedFlow) where values are emitted regardless of collectors. We’ll cover those in a later post.


Context and Dispatchers

By default, a Flow runs in the coroutine context of the collector. But you can change which dispatcher the upstream (emission) code runs on using flowOn:

fun fetchArticles(): Flow<Article> = flow {
    // This runs on Dispatchers.IO because of flowOn below
    val articles = api.getArticles()
    articles.forEach { emit(it) }
}
.flowOn(Dispatchers.IO)   // upstream runs on IO dispatcher

// In ViewModel — collection happens on Main
viewModelScope.launch {
    // collect runs on Main (viewModelScope's dispatcher)
    fetchArticles().collect { article ->
        _articles.value += article   // safe to update UI state
    }
}

flowOn changes upstream context only

flow {
    // Runs on IO (because of the first flowOn below)
    emit(heavyComputation())
}
.map { result ->
    // Runs on Default (because of the second flowOn below)
    transform(result)
}
.flowOn(Dispatchers.Default)   // applies to map
.flowOn(Dispatchers.IO)        // applies to flow { }
.collect { value ->
    // Runs on the collector's context (Main in viewModelScope)
    updateUi(value)
}

Important: Never use withContext inside flow { } to switch the emission context. Use flowOn instead — it’s the correct, safe way:

// ❌ WRONG — throws IllegalStateException
flow {
    withContext(Dispatchers.IO) {
        emit(fetchData())   // emission from different context = crash
    }
}

// ✅ CORRECT — use flowOn
flow {
    emit(fetchData())
}.flowOn(Dispatchers.IO)

Basic Operators

Flow supports familiar transformation operators. These are intermediate operators — they return a new Flow and don’t trigger collection:

repository.getArticlesFlow()
    .filter { it.isPublished }              // only published articles
    .map { it.toUiModel() }                 // transform to UI model
    .take(10)                                // only first 10
    .collect { article ->
        showArticle(article)
    }

map — transform each value

val names: Flow<String> = userFlow.map { user -> user.name }

filter — keep values matching a condition

val adults: Flow<User> = userFlow.filter { it.age >= 18 }

transform — flexible emit for each value

val detailed: Flow<String> = userFlow.transform { user ->
    emit("Loading ${user.name}...")        // emit a loading message
    val details = fetchDetails(user.id)    // suspend call
    emit("${user.name}: $details")         // emit the result
}

take — limit the number of values

val firstThree: Flow<Int> = numberFlow.take(3)
// Cancels the upstream Flow after 3 values

onEach — side effect without changing values

repository.getArticlesFlow()
    .onEach { println("Received: ${it.title}") }   // logging
    .collect { showArticle(it) }

Exception Handling

Use catch to handle exceptions in a Flow. It catches any exception thrown upstream (above it in the chain):

repository.getArticlesFlow()
    .map { it.toUiModel() }
    .catch { e ->
        // Catches exceptions from getArticlesFlow() and map
        println("Error: ${e.message}")
        emit(emptyUiModel())               // emit a fallback value
    }
    .collect { article ->
        showArticle(article)
    }

catch only catches upstream exceptions

flow { emit(1) }
    .catch { println("Caught: $it") }     // catches upstream errors
    .collect {
        throw RuntimeException("Oops")    // ❌ NOT caught by catch
    }

// To handle collector exceptions, use try-catch around collect
try {
    flow { emit(1) }.collect {
        throw RuntimeException("Oops")
    }
} catch (e: Exception) {
    println("Caught in try-catch: ${e.message}")
}

onCompletion — runs when Flow completes (normally or with error)

repository.getArticlesFlow()
    .onCompletion { cause ->
        if (cause == null) {
            println("Flow completed successfully")
        } else {
            println("Flow failed: ${cause.message}")
        }
        _isLoading.value = false   // cleanup — always runs
    }
    .catch { e -> _error.value = e.message }
    .collect { _articles.value = it }

Real Android Patterns

Repository returning a Flow (Room database)

// Room DAO — returns Flow automatically
@Dao
interface ArticleDao {
    @Query("SELECT * FROM articles ORDER BY date DESC")
    fun getArticles(): Flow<List<Article>>   // emits new list on every DB change
}

// Repository — expose Flow
class ArticleRepository(private val dao: ArticleDao) {
    fun getArticles(): Flow<List<Article>> = dao.getArticles()
        .map { articles -> articles.filter { it.isPublished } }
        .flowOn(Dispatchers.IO)
}

// ViewModel — convert Flow to StateFlow for UI
class ArticleViewModel(private val repository: ArticleRepository) : ViewModel() {

    val articles: StateFlow<List<Article>> = repository.getArticles()
        .catch { emit(emptyList()) }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )
}

Collecting Flow safely in a Fragment

class ArticleFragment : Fragment() {

    override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
        super.onViewCreated(view, savedInstanceState)

        // Lifecycle-aware collection — automatically stops when view is destroyed
        viewLifecycleOwner.lifecycleScope.launch {
            viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
                viewModel.articles.collect { articles ->
                    adapter.submitList(articles)
                }
            }
        }
    }
    // When fragment goes below STARTED → collection paused
    // When fragment returns to STARTED → collection resumes
    // When view destroyed → collection cancelled permanently
}

Network + cache with Flow

class ArticleRepository(
    private val api: ArticleApi,
    private val dao: ArticleDao
) {
    fun getArticles(): Flow<List<Article>> = flow {
        // Step 1: Emit cached data immediately
        val cached = dao.getArticlesList()
        if (cached.isNotEmpty()) {
            emit(cached)
        }

        // Step 2: Fetch fresh data from network
        try {
            val fresh = api.getArticles()
            dao.insertAll(fresh)        // update cache
            emit(fresh)                  // emit fresh data
        } catch (e: Exception) {
            if (cached.isEmpty()) {
                throw e                  // no cache, propagate error
            }
            // else: we already emitted cache, silently fail network
        }
    }.flowOn(Dispatchers.IO)
}

Search with Flow — debounce + distinctUntilChanged

class SearchViewModel(private val repository: SearchRepository) : ViewModel() {

    private val _query = MutableStateFlow("")

    val results: StateFlow<List<SearchResult>> = _query
        .debounce(300)                         // wait 300ms after last keystroke
        .distinctUntilChanged()                // skip if query hasn't changed
        .filter { it.isNotBlank() }            // skip empty queries
        .flatMapLatest { query ->              // cancel previous search, start new
            repository.search(query)
                .catch { emit(emptyList()) }
        }
        .stateIn(
            scope = viewModelScope,
            started = SharingStarted.WhileSubscribed(5000),
            initialValue = emptyList()
        )

    fun onQueryChanged(query: String) {
        _query.value = query
    }
}

Common Mistakes to Avoid

Mistake 1: Using withContext inside flow { } to change emission context

// ❌ Crashes with IllegalStateException
flow {
    withContext(Dispatchers.IO) {
        emit(repository.getData())
    }
}

// ✅ Use flowOn instead
flow {
    emit(repository.getData())
}.flowOn(Dispatchers.IO)

Mistake 2: Collecting in GlobalScope or without lifecycle awareness

// ❌ Leaks — collection continues even after Activity/Fragment is destroyed
GlobalScope.launch {
    viewModel.articles.collect { updateUi(it) }
}

// ✅ Use lifecycle-aware collection
viewLifecycleOwner.lifecycleScope.launch {
    viewLifecycleOwner.repeatOnLifecycle(Lifecycle.State.STARTED) {
        viewModel.articles.collect { updateUi(it) }
    }
}

Mistake 3: Forgetting that Flow is cold — creating new Flow on every call

// ❌ Creates a new Flow every time — each collector triggers a fresh API call
class MyViewModel : ViewModel() {
    fun getArticles(): Flow<List<Article>> = flow {
        emit(repository.fetchArticles())   // called every time someone collects
    }
}

// ✅ Use stateIn to share a single Flow among multiple collectors
class MyViewModel : ViewModel() {
    val articles: StateFlow<List<Article>> = flow {
        emit(repository.fetchArticles())
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())
}

Mistake 4: Catching CancellationException inside a Flow

// ❌ Swallows cancellation — breaks structured concurrency
flow { emit(fetchData()) }
    .catch { e ->
        // catches ALL exceptions including CancellationException
        emit(fallbackData())
    }

// ✅ catch already handles this correctly — CancellationException is re-thrown
// by default in Flow's catch operator. But in custom try-catch:
flow {
    try {
        emit(fetchData())
    } catch (e: CancellationException) {
        throw e   // always re-throw
    } catch (e: Exception) {
        emit(fallbackData())
    }
}

Summary

  • A Flow is a cold, asynchronous stream that emits values sequentially
  • Cold means the flow { } block only runs when collect is called — each collector gets its own execution
  • Create Flows with flow { }, flowOf(), .asFlow(), or callbackFlow { }
  • collect is the primary terminal operator — others include first(), toList(), reduce(), fold()
  • Use flowOn to change the upstream dispatcher — never use withContext inside flow { }
  • Intermediate operators like map, filter, transform, take return new Flows without triggering collection
  • catch handles upstream exceptions — use try-catch around collect for downstream errors
  • onCompletion runs when a Flow finishes — whether successful, failed, or cancelled
  • In Android, convert Flow to StateFlow using stateIn for lifecycle-safe UI state
  • Always collect with repeatOnLifecycle in Fragments/Activities — never GlobalScope
  • Room returns Flow<List<T>> that auto-emits on database changes — perfect for reactive UI
  • debounce + distinctUntilChanged + flatMapLatest is the standard pattern for search

Flow is the foundation of reactive programming in Kotlin. Once you understand that it’s cold, sequential, and coroutine-based — everything from simple data transformations to complex multi-source reactive streams becomes straightforward. Master the basics here, and operators + StateFlow/SharedFlow in the next posts will build naturally on top.

Happy coding!