In the previous post, we covered what Flow is and how to create, collect, and handle errors in basic streams. Now it’s time to go deeper into Flow operators — the tools that let you transform, combine, filter, and flatten streams of data. These operators are what make Flow truly powerful. Instead of writing manual loops and state tracking, you chain declarative operators that handle the complexity for you. This guide covers every operator category with practical examples you’ll actually use in Android apps.


Transformation Operators

map — transform each value

// Convert domain model to UI model
val uiModels: Flow<ArticleUiModel> = repository.getArticlesFlow()
    .map { article ->
        ArticleUiModel(
            title = article.title,
            subtitle = "${article.author} · ${article.date.format()}",
            imageUrl = article.coverImage
        )
    }

mapNotNull — transform and skip nulls

// Parse JSON strings, skip failures
val validUsers: Flow<User> = jsonFlow.mapNotNull { json ->
    try {
        parseUser(json)
    } catch (e: Exception) {
        null   // returning null = this value is skipped
    }
}

transform — emit zero, one, or multiple values per input

// Emit loading state before each network call
val detailedUsers: Flow<UiState> = userIdsFlow.transform { userId ->
    emit(UiState.Loading(userId))                  // first emit: loading
    val user = repository.fetchUser(userId)        // suspend call
    emit(UiState.Success(user))                    // second emit: result
}

// Skip certain values entirely — just don't emit
val filtered: Flow<Int> = numberFlow.transform { value ->
    if (value > 0) emit(value)   // negative numbers are dropped
}

scan — running accumulation (like fold but emits every step)

// Running total — emits after every value
val runningTotal: Flow<Int> = flowOf(1, 2, 3, 4, 5)
    .scan(0) { accumulator, value ->
        accumulator + value
    }
// Emits: 0, 1, 3, 6, 10, 15
//        ^initial  ^0+1  ^1+2  ^3+3  ^6+4  ^10+5

// Track state changes over time
val stateHistory: Flow<List<Event>> = eventFlow
    .scan(emptyList<Event>()) { history, event ->
        history + event   // append each event to history
    }

Filtering Operators

filter — keep values matching a condition

val activeUsers: Flow<User> = userFlow.filter { it.isActive }

val largeFiles: Flow<File> = fileFlow.filter { it.size > 1_000_000 }

filterNot — keep values that don’t match

val nonAdmins: Flow<User> = userFlow.filterNot { it.isAdmin }

filterIsInstance — keep only values of a specific type

// Filter sealed class subtypes
val errors: Flow<UiState.Error> = uiStateFlow.filterIsInstance<UiState.Error>()

// From a mixed-type Flow
val strings: Flow<String> = mixedFlow.filterIsInstance<String>()

distinctUntilChanged — skip consecutive duplicates

// Only emit when value actually changes
val uniqueStates: Flow<ConnectionState> = connectionFlow
    .distinctUntilChanged()
// CONNECTED, CONNECTED, DISCONNECTED, DISCONNECTED, CONNECTED
// becomes: CONNECTED, DISCONNECTED, CONNECTED

// Custom equality check
val nameChanges: Flow<User> = userFlow
    .distinctUntilChanged { old, new -> old.name == new.name }
// Emits only when name changes, ignoring other field changes

take — limit the number of values

val firstFive: Flow<Article> = articleFlow.take(5)
// Collects 5 values, then cancels the upstream Flow

takeWhile — take until condition fails

// Take values while user is online
val onlineEvents: Flow<Event> = eventFlow
    .takeWhile { it.userStatus == Status.ONLINE }
// Stops collecting as soon as one event has offline status

drop — skip the first N values

// Skip initial loading states
val afterWarmup: Flow<SensorData> = sensorFlow.drop(5)

// dropWhile — skip until condition fails
val afterCalibration: Flow<SensorData> = sensorFlow
    .dropWhile { it.isCalibrating }

debounce — wait for a pause in emissions

// Search — only emit after user stops typing for 300ms
val searchQuery: Flow<String> = queryFlow
    .debounce(300)
// User types: "k" "ko" "kot" "kotl" "kotli" "kotlin"
// Only "kotlin" is emitted (300ms after last keystroke)

sample — emit the latest value at fixed intervals

// High-frequency sensor data — sample every 100ms for UI
val sampledData: Flow<SensorReading> = sensorFlow
    .sample(100)
// If sensor emits 1000 times/sec, UI only gets ~10 updates/sec

Combining Operators

combine — merge latest values from multiple Flows

// Combine search query with filter selection
val query: StateFlow<String> = _query
val filter: StateFlow<Category> = _filter

val results: Flow<List<Article>> = combine(query, filter) { q, f ->
    repository.search(q, f)
}
// Emits whenever EITHER query or filter changes
// Always uses the latest value from both

// Three-way combine
val uiState: Flow<UiState> = combine(
    userFlow,
    settingsFlow,
    networkStatusFlow
) { user, settings, networkStatus ->
    UiState(
        userName = user.name,
        darkMode = settings.darkMode,
        isOffline = !networkStatus.isConnected
    )
}

zip — pair values one-to-one from two Flows

val names = flowOf("Alice", "Bob", "Charlie")
val ages = flowOf(25, 30, 35)

val users: Flow<String> = names.zip(ages) { name, age ->
    "$name is $age years old"
}
// "Alice is 25 years old"
// "Bob is 30 years old"
// "Charlie is 35 years old"

// zip completes when EITHER Flow completes
val short = flowOf(1, 2)
val long = flowOf("a", "b", "c", "d")
short.zip(long) { num, letter -> "$num$letter" }
// Emits: "1a", "2b" — stops because short Flow is done

combine vs zip — the key difference

val flow1 = flow {
    emit("A")
    delay(100)
    emit("B")
    delay(100)
    emit("C")
}

val flow2 = flow {
    emit(1)
    delay(250)
    emit(2)
}

// combine — uses latest from each, emits on ANY change
flow1.combine(flow2) { letter, number -> "$letter$number" }
// "A1", "B1", "C1", "C2"
// ^both arrive  ^flow1 updates  ^flow1 updates  ^flow2 updates

// zip — pairs values strictly one-to-one
flow1.zip(flow2) { letter, number -> "$letter$number" }
// "A1", "B2"
// Waits for a value from EACH flow before emitting

merge — combine multiple Flows into one stream

// Merge events from multiple sources
val allEvents: Flow<Event> = merge(
    networkEvents,
    databaseEvents,
    userInputEvents
)
// Emits values from all three Flows as they arrive
// Order depends on timing, not declaration order

Flattening Operators

When a Flow emits other Flows (a Flow<Flow<T>>), you need a flattening operator to merge them into a single Flow<T>. The three options differ in how they handle new inner Flows:

flatMapConcat — process one at a time, in order

// Fetch details for each user — one at a time, strictly ordered
val allDetails: Flow<UserDetails> = userIdsFlow
    .flatMapConcat { userId ->
        repository.getUserDetails(userId)   // returns Flow<UserDetails>
    }
// Waits for each inner Flow to complete before starting the next
// User 1 details → (complete) → User 2 details → (complete) → ...

flatMapLatest — cancel previous, keep only latest

// Search — cancel previous search when query changes
val results: Flow<List<SearchResult>> = queryFlow
    .flatMapLatest { query ->
        repository.search(query)   // previous search is CANCELLED
    }
// User types "kot" → search starts
// User types "kotlin" → "kot" search cancelled, "kotlin" search starts

// This is the most common flattening operator in Android —
// perfect for any "cancel and restart" pattern

flatMapMerge — run all concurrently

// Download multiple files concurrently
val downloadResults: Flow<DownloadResult> = urlsFlow
    .flatMapMerge(concurrency = 4) { url ->    // max 4 concurrent downloads
        downloadFile(url)   // returns Flow<DownloadResult>
    }
// All inner Flows run at the same time (up to concurrency limit)
// Results arrive in whatever order they complete

When to use which

// flatMapConcat  — order matters, sequential processing
//   Example: paginated API (page 1, then page 2, then page 3)

// flatMapLatest  — only care about the latest, cancel old work
//   Example: search, user selection, live filters

// flatMapMerge   — process all concurrently for speed
//   Example: batch downloads, parallel API calls

Side Effect Operators

repository.getArticlesFlow()
    .onStart {
        // Runs BEFORE the Flow starts emitting
        _isLoading.value = true
        println("Flow collection started")
    }
    .onEach { article ->
        // Runs for EACH emitted value — before it reaches collect
        println("Received: ${article.title}")
        analyticsTracker.logArticleLoaded(article.id)
    }
    .onCompletion { cause ->
        // Runs when Flow completes (success or failure)
        _isLoading.value = false
        if (cause != null) {
            println("Flow failed: ${cause.message}")
        }
    }
    .catch { e ->
        _error.value = e.message
    }
    .collect { article ->
        _articles.value += article
    }

onStart — emit initial values

// Emit a default value before the real data arrives
val articlesWithDefault: Flow<List<Article>> = repository.getArticlesFlow()
    .onStart { emit(emptyList()) }   // collector gets empty list immediately

Buffering and Conflation

When a collector is slower than the emitter, backpressure builds up. Flow handles this sequentially by default — the emitter suspends until the collector processes each value. You can change this behaviour:

buffer — run emitter and collector concurrently

// Without buffer: total time = emission time + collection time (sequential)
flow {
    emit(1)          // emits at 0ms
    delay(100)
    emit(2)          // emits at 100ms
    delay(100)
    emit(3)          // emits at 200ms
}
.collect { value ->
    delay(300)       // slow collector
    println(value)
}
// Total: ~1200ms (each value waits for collector)

// With buffer: emitter and collector run concurrently
flow {
    emit(1)
    delay(100)
    emit(2)
    delay(100)
    emit(3)
}
.buffer()            // emitter doesn't wait for collector
.collect { value ->
    delay(300)
    println(value)
}
// Total: ~1100ms (emitter runs ahead, values queue up)

conflate — skip intermediate values if collector is slow

// Sensor data arriving fast, UI update is slow
sensorFlow
    .conflate()   // if collector is busy, drop intermediate values
    .collect { reading ->
        delay(300)   // slow UI update
        updateChart(reading)
    }
// If values 1, 2, 3, 4, 5 arrive while collector processes 1:
// Collector gets: 1, then 5 (2, 3, 4 are dropped)
// Always processes the LATEST available value

collectLatest — cancel slow processing when new value arrives

// Re-render UI with latest data — cancel if new data arrives
searchResults
    .collectLatest { results ->
        // If a new emission arrives while this is running,
        // this block is CANCELLED and restarted with the new value
        val rendered = heavyRender(results)   // cancellable work
        updateUi(rendered)
    }

Real Android Patterns

Combining multiple data sources for UI state

class DashboardViewModel(
    private val userRepo: UserRepository,
    private val articleRepo: ArticleRepository,
    private val settingsRepo: SettingsRepository
) : ViewModel() {

    val uiState: StateFlow<DashboardUiState> = combine(
        userRepo.currentUser(),              // Flow<User>
        articleRepo.getArticles(),           // Flow<List<Article>>
        settingsRepo.getSettings()           // Flow<Settings>
    ) { user, articles, settings ->
        DashboardUiState(
            userName = user.displayName,
            articles = articles.map { it.toUiModel() },
            isDarkMode = settings.darkMode
        )
    }
    .catch { e ->
        emit(DashboardUiState.Error(e.message ?: "Unknown error"))
    }
    .stateIn(
        scope = viewModelScope,
        started = SharingStarted.WhileSubscribed(5000),
        initialValue = DashboardUiState.Loading
    )
}

Filter + sort with reactive inputs

class ArticleListViewModel(private val repository: ArticleRepository) : ViewModel() {

    private val _sortOrder = MutableStateFlow(SortOrder.DATE_DESC)
    private val _categoryFilter = MutableStateFlow<Category?>(null)

    val articles: StateFlow<List<ArticleUiModel>> = combine(
        repository.getArticles(),
        _sortOrder,
        _categoryFilter
    ) { articles, sort, category ->
        articles
            .filter { article ->
                category == null || article.category == category
            }
            .sortedWith(
                when (sort) {
                    SortOrder.DATE_DESC -> compareByDescending { it.date }
                    SortOrder.DATE_ASC -> compareBy { it.date }
                    SortOrder.TITLE -> compareBy { it.title }
                }
            )
            .map { it.toUiModel() }
    }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

    fun setSortOrder(order: SortOrder) { _sortOrder.value = order }
    fun setCategory(category: Category?) { _categoryFilter.value = category }
}

Retry with exponential backoff

fun <T> Flow<T>.retryWithBackoff(
    maxRetries: Int = 3,
    initialDelayMs: Long = 1000,
    maxDelayMs: Long = 10000
): Flow<T> = this.retryWhen { cause, attempt ->
    if (cause is CancellationException) {
        false   // never retry cancellation
    } else if (attempt < maxRetries) {
        val delayMs = (initialDelayMs * 2.0.pow(attempt.toInt()))
            .toLong()
            .coerceAtMost(maxDelayMs)
        delay(delayMs)
        true    // retry
    } else {
        false   // max retries exceeded
    }
}

// Usage
repository.getArticlesFlow()
    .retryWithBackoff(maxRetries = 3)
    .catch { e -> _error.value = e.message }
    .collect { _articles.value = it }

Paginated list with flatMapConcat

class PaginatedViewModel(private val repository: ArticleRepository) : ViewModel() {

    private val _loadNextPage = MutableSharedFlow<Unit>()
    private var currentPage = 0

    val articles: StateFlow<List<Article>> = _loadNextPage
        .onStart { emit(Unit) }   // load first page automatically
        .scan(emptyList<Article>()) { accumulated, _ ->
            val nextPage = repository.getArticles(page = currentPage++)
            accumulated + nextPage   // append new page to existing list
        }
        .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

    fun loadMore() {
        viewModelScope.launch { _loadNextPage.emit(Unit) }
    }
}

Common Mistakes to Avoid

Mistake 1: Using flatMapMerge when order matters

// ❌ Concurrent execution — results arrive in random order
pagesFlow.flatMapMerge { page ->
    repository.fetchPage(page)
}

// ✅ Use flatMapConcat for ordered sequential processing
pagesFlow.flatMapConcat { page ->
    repository.fetchPage(page)
}

Mistake 2: Collecting the same cold Flow multiple times accidentally

// ❌ Each collect triggers a separate network call
val articlesFlow = flow { emit(api.getArticles()) }

viewModelScope.launch { articlesFlow.collect { /* ... */ } }
viewModelScope.launch { articlesFlow.collect { /* ... */ } }
// Two API calls!

// ✅ Share with stateIn or shareIn
val articlesFlow = flow { emit(api.getArticles()) }
    .stateIn(viewModelScope, SharingStarted.WhileSubscribed(5000), emptyList())

Mistake 3: Forgetting that combine emits on every change

// ❌ Heavy computation runs every time ANY input changes
combine(flowA, flowB, flowC) { a, b, c ->
    expensiveComputation(a, b, c)   // runs 3x if all three change quickly
}

// ✅ Add debounce or use distinctUntilChanged to reduce emissions
combine(flowA, flowB, flowC) { a, b, c -> Triple(a, b, c) }
    .debounce(100)   // batch rapid changes
    .map { (a, b, c) -> expensiveComputation(a, b, c) }

Mistake 4: Using flatMapLatest when you need all results

// ❌ Previous downloads get cancelled!
urlsFlow.flatMapLatest { url ->
    downloadFile(url)   // only the LAST url is downloaded
}

// ✅ Use flatMapMerge for concurrent execution of all items
urlsFlow.flatMapMerge { url ->
    downloadFile(url)   // all downloads run concurrently
}

// ✅ Use flatMapConcat for sequential execution of all items
urlsFlow.flatMapConcat { url ->
    downloadFile(url)   // downloads run one at a time, in order
}

Summary

  • Transformation: map transforms each value, mapNotNull skips nulls, transform gives full control over emissions, scan provides running accumulation
  • Filtering: filter, filterNot, filterIsInstance for type-based filtering, distinctUntilChanged to skip consecutive duplicates
  • Limiting: take limits count, takeWhile takes until condition fails, drop and dropWhile skip initial values
  • Timing: debounce waits for a pause in emissions, sample emits at fixed intervals
  • Combining: combine merges latest values (emits on any change), zip pairs values one-to-one, merge interleaves multiple Flows
  • Flattening: flatMapConcat for sequential, flatMapLatest for cancel-and-restart, flatMapMerge for concurrent processing
  • Side effects: onStart runs before collection, onEach runs per value, onCompletion runs when Flow ends
  • Backpressure: buffer decouples emitter from collector, conflate drops intermediate values, collectLatest cancels slow processing
  • combine is the go-to operator for merging multiple UI inputs (search + filter + sort)
  • flatMapLatest is the go-to for search and any “cancel previous, restart” pattern
  • retryWhen with exponential backoff handles transient network failures gracefully
  • Use stateIn to share a cold Flow among multiple collectors and avoid duplicate work

Flow operators are where reactive programming becomes practical. Once you know the difference between combine and zip, between flatMapLatest and flatMapConcat, and when to use buffer vs conflate — you can model almost any async data pipeline declaratively. Next up, we’ll look at StateFlow vs SharedFlow — the hot counterparts to cold Flow.

Happy coding!