Flow: асинхронный «поток» в Kotlin

Статья будет полезнее всего тем, кто уже знаком с платформой Android, Kotlin и корутинами
3 минуты614

В корутинах Flow — это класс, который может возвращать несколько объектов по очереди или сразу. Ключевое слово тут «несколько»: это главное его отличие от suspend function, которая возвращает один объект и завершается. Для примера, Flow гораздо удобнее, если вы подписываетесь на постоянные уведомления от вашего GPS или на получение сообщений в чате. Flow работает на основе корутин и представляет собой поток данных, которые можно обрабатывать асинхронно. С помощью Flow можно отправлять запросы на сервер или в базу данных без блокирования основного потока приложения. Все данные, которые возвращает Flow, должны быть, естественно, одного типа: если поток объявлен как Flow<Int>, то получать из него можно только объекты типа Int. 

В работу Flow вовлечены три объекта:

  1. Producer — производит (создает, испускает) данные в виде потока. Данные передаются в отдельном потоке благодаря корутинам.
  2. Intermediary (Посредник) — класс или классы, которые могут модифицировать или изменять данные, произведенные Producer’ом. Обычно это какие-то вспомогательные классы или так называемые мапперы. Наличие посредников не обязательно, если данные не нужно модифицировать или переводить их из одного типа в другой.
  3. Consumer — получатель данных, произведённых Producer’ом.

На простом примере посмотрим, как можно использовать Flow в приложении. Для построения приложения мы будем использовать упрощённый аналог чистой архитектуры:

  • у нас будут данные, которые хранятся в условной базе данных: Data;
  • будет источник данных, который будет получать данные из БД: DataSource;
  • будет репозиторий, который работает с нашим источником данных: Repository;
  • репозиторий будет использоваться в нашей ViewModel, и в итоге данные будут отображаться в Activity. 

Для начала создадим простой класс для передачи наших данных. В нашем случае это data class, который содержит некое значение в виде String. Именно Data будет получать наша Activity:

internal data class Data(val data: String)

Теперь опишем источник наших данных. Пусть это будет умозрительная БД, на изменения данных в которой мы и хотим подписаться. Так как данные в ней будут изменяться постоянно, то Flow идеально подходит для наших целей. База данных:

internal object DataBase {
   fun fetchData() = Random.nextInt()
}

У нашей базы данных есть метод, который возвращает нужную информацию в виде какого-то случайного числа. Таким образом мы имитируем постоянное изменение данных в БД.

Пришло время DataSource и Flow. Класс DataSource принимает в конструктор два аргумента: базу данных и период обновления данных. Период равен одной секунде, указанной в миллисекундах, и содержит одну переменную типа Flow<String>, которая содержит в себе данные из нашей БД, переведённые из Int в String. Чтобы создать простой поток, нужно воспользоваться flow builder. В нашем случае это простая функция Flow, в которой всё и происходит:

internal class DataSource(
   private val dataBase: DataBase = DataBase,
   private val refreshIntervalMs: Long = 1000
) {
   val data: Flow<String> = flow {
       while (true) {
           val dataFromDataBase = dataBase.fetchData()
           emit(dataFromDataBase.toString())
           delay(refreshIntervalMs)
       }
   }
       .flowOn(Dispatchers.Default)
       .catch { e ->
           println(e.message)//Error!
       }
}

В бесконечном цикле мы обращаемся к БД, получаем случайное число и «испускаем» (функция emit) это число уже в виде String для всех, кто «подписан» на поток (вспоминаем Producer и Consumer). После этого мы делаем паузу на одну секунду в цикле с помощью функции delay. Функции flowOn и catch опциональны: код будет прекрасно работать и без них. Во flowOn можно явно указать, в каком потоке будет выполняться работа, а catch отловит ошибку, если такие появятся в процессе работы.

Пришло время репозитория. В него мы передаём наш DataSource. У репозитория тоже всего одна переменная типа Flow<Data>. Обратите внимание, что DataSource возвращает тип данных String, а Репозиторий передает дальше уже Data. Репозиторий в данном случае является Посредником:

internal class Repository(dataSource: DataSource = DataSource()) {
 
   val userData: Flow<Data> =
       dataSource.data.map { data -> Data(data) }
   //.onEach { saveInCache(it) }
}

Тут мы видим, как у переменной класса DataSource data (это и есть наш поток Flow<String>) вызывается функция map, которая позволяет сохранить полученное значение String в класс Data. Функция onEach опциональна и показывает, что значение, возвращаемое нашим DataSource, можно сохранить для дальнейшего использования или сделать с ним ещё неограниченное количество операций.

    Осталось описать последний класс нашей бизнес-логики — ViewModel. ViewModel содержит в себе LiveData, на которую подписана наша Активити. Всё, что нам нужно сделать, — передать в конструктор ViewModel наш Репозиторий и запустить процесс получения данных из Базы данных:

internal class MainViewModel(
   repository: Repository = Repository()
) : ViewModel() {
 
   val liveData: MutableLiveData<Data> = MutableLiveData()
 
   init {
       viewModelScope.launch {
           repository.userData.flowOn(Dispatchers.Main)
               .collect { data ->
                   liveData.value = data
               }
       }
   }
}

    Делается это в момент создания ViewModel (блок инициализации init). Чтобы подписаться на Flow, нужно запустить процесс через корутины (помним, что поток выполняется асинхронно). Для этого у нас есть viewModelScope.launch, который мы запускаем в блоке инициализации (также его можно запускать и внутри suspend функции). Далее у userData вызываем функцию flowOn, где указываем, что все данные у нас будут отображаться уже в основном потоке приложения. Функция collect непосредственно запускает поток. Как только нам приходит очередная порция данных (раз в секунду), мы обновляем LiveData. 

    На самом деле всё это можно запустить в одну строку, так как у класса Flow есть для этого специальные функции:

val liveData: LiveData<Data> = repository.userData.asLiveData()
 
/*val liveData: MutableLiveData<Data> = MutableLiveData()
 
init {
   viewModelScope.launch {
       repository.userData.flowOn(Dispatchers.Main)
           .collect { data ->
               liveData.value = data
           }
   }
}*/

Все классы бизнес-логики полностью:

internal class MainViewModel(
   repository: Repository = Repository()
) : ViewModel() {
   val liveData: LiveData<Data> = repository.userData.asLiveData()
 
   /*val liveData: MutableLiveData<Data> = MutableLiveData()
 
   init {
       viewModelScope.launch {
           repository.userData.flowOn(Dispatchers.Main)
               .collect { data ->
                   liveData.value = data
               }
       }
   }*/
}
 
internal class Repository(dataSource: DataSource = DataSource()) {
 
   val userData: Flow<Data> =
       dataSource.data.map { data -> Data(data) }
   //.onEach { saveInCache(it) }
}
 
internal class DataSource(
   private val dataBase: DataBase = DataBase,
   private val refreshIntervalMs: Long = 1000
) {
   val data: Flow<String> = flow {
       while (true) {
           val dataFromDataBase = dataBase.fetchData()
           emit(dataFromDataBase.toString())
           delay(refreshIntervalMs)
       }
   }
   /*.flowOn(Dispatchers.Default)
   .catch { e ->
       println(e.message)//Error!
   }*/
}
 
internal object DataBase {
   fun fetchData() = Random.nextInt()
}
 
internal data class Data(val data: String)

Осталось отобразить весь нехитрый процесс на экране. Макет: 

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
   xmlns:app="http://schemas.android.com/apk/res-auto"
   android:id="@+id/main"
   android:layout_width="match_parent"
   android:layout_height="match_parent">
 
   <TextView
       android:id="@+id/message"
       android:layout_width="wrap_content"
       android:layout_height="wrap_content"
       app:layout_constraintBottom_toBottomOf="parent"
       app:layout_constraintEnd_toEndOf="parent"
       app:layout_constraintStart_toStartOf="parent"
       app:layout_constraintTop_toTopOf="parent" />
</androidx.constraintlayout.widget.ConstraintLayout>

В Activity мы создаем ViewModel, подписываемся на изменение данных, и как только данные меняются, отображаем их на экране. Всё остальное происходит внутри ViewModel:

class MainActivity : AppCompatActivity() {
 
   override fun onCreate(savedInstanceState: Bundle?) {
       super.onCreate(savedInstanceState)
       setContentView(R.layout.main_activity)
       val textView = findViewById<TextView>(R.id.message)
       ViewModelProvider(this).get(MainViewModel::class.java).liveData.observe(
           this,
           { dataFromDataBase ->
               textView.text = dataFromDataBase.data
           })
   }
}

Запускаем и наслаждаемся потоком данных:

Более развернутые и продвинутые работы с Flow можно посмотреть в этом репозитории.

программированиеandroidтуториал
Нашли ошибку в тексте? Напишите нам.
Спасибо,
что читаете наш блог!