해당 글은 아래 글의 번역본 입니다. 오역이 있을 수 있으니, 코멘트 남겨주시면 감사합니다.
kotlinlang.org/docs/flow.html#sequences
Flows ?
순차적으로 값을 배출해서, 정상적으로 완료하거나 에러를 던지는 비동기 데이터 스트림 입니다.
일시중단할 수 있는 함수는(Suspending function) 비동기적으로 하나의 값을 반환합니다.
하지만 비동기적으로 계산되어진 값을 여러개를 어떻게 반환해야할까? 이때 Flow 를 사용해아합니다.
여러값 표현하기
- List
다양한 값들은 코틀린의 Collections 을 사용하여 표현될 수 있습니다. 예를 들어 우리는 3개의 숫자를 반환하는 리스트 simple 함수를 만들고 forEach 를 사용하여 모든 값을 출력할 수 있습니다.
fun simple() = listOf(1, 2, 3)
fun main() {
simple().forEach(::println)
}
// 결과
1
2
3
- Sequence
만약 CPU 자원을 소모하는 작업(여기서는 Thread.sleep(100) 으로 표현하겠습니다)을 진행하는 경우에는 Sequence 를 사용해 표현할 수 있습니다.
fun simple() = sequence { // sequence builder
for (i in 1..3) {
Thread.sleep(100) // 큰 작업을 하는척
yield(i) // 다음 값 계산하기
}
}
fun main() {
simple().forEach(::println)
}
// 결과
delay 100 (MainThread 차단)
1
delay 100 (MainThread 차단)
2
delay 100 (MainThread 차단)
3
- Suspending functions
그러나 위의 Sequence 에 사용된 예시는 계산하는 동안 메인 스레드를 차단하게 됩니다. 이 값들을 비동기 적으로 실행하여 값을 가져오고 싶다면 우리는 Suspend 키워드를 사용하여 simple 함수를 만들어 볼 수 있습니다. 결과적으로 리스트에 담겨 결과가 도출될 때는 메인스레드의 차단 없이 실행됩니다.
suspend fun simple(): List<Int> {
delay(1000) // 어떤 비동기로 동작하는 작업이 진행되고 있다
return listOf(1, 2, 3)
}
fun main() = runBlocking<Unit> {
simple().forEach(::println)
}
// 결과
delay 1000 (MainThread 차단 하지 않음)
1
2
3
- Flow
List<Int> 를 반환 타입으로 사용하면, 한번에 모든 원소들만 반환할 수 있습니다. 동기적으로 계산되는 값을 처리하기위해, Sequence<Int>를 사용했다면, 비동기적으로 계산되는 값의 스트림을 처리하기위해서는, Flow<Int> 를 사용할 수 있습니다.
fun simple(): Flow<Int> = flow { // flow builder
for (i in 1..3) {
delay(100) // 무엇가 의미있는 작업을 해서 지연되는 척
emit(i) // 다음 값 배출
}
}
fun main() = runBlocking<Unit> {
// 메인 스레드를 차단하지는 확인하기 위한 용도의 코루틴 luanch
launch {
for (k in 1..3) {
println("I'm not blocked $k")
delay(100)
}
}
// Flow 결과를 수집한다
simple().collect(::println)
}
// 결과
I'm not blocked 1
1
I'm not blocked 2
2
I'm not blocked 3
3
위의 코드는 메인 스레드의 차단 없이 각각의 숫자들이 출력되기 전에 100ms 대기하게 됩니다.. 메인 스레드가 실행되면서 분리된 코루틴 스코프로부터 매 100ms "i'm not blocked $number" 가 찍힘으로써 결과 가 맞다는 것이 확인됩니다..
위의 예제와 다른 류의 Flow 코드를 보고 싶다면 Flow 에서 몇가지 예제들을 볼 수 있습니다.
- flow {...} 빌더 블록 안에서는 일시중단 가능합니다(suspen)
- suspend 함수에는 더이상 suspend 라는 키워드가 붙지 않아도 됩니다.
- emit 함수를 통해 flow 로부터 값들이 방출 되어집니다.
- collect 함수를 통해 flow 로 부터 값이 수집 되어집니다.
Flows 는 차갑다 (Rx 의 cold observable 을 생각하면 이해가 쉬울 것 같습니다 - 구독하지 않으면 아이템이 발행되지 않음)
Flows 는 Sequences 와 비슷한 차가운 스트림 입니다. flow 빌더 안에 존재하는 코드는 수집(collected) 되기 전까지 실행되지 않습니다.
아래의 몇가지 예제를 보며 확인해보겠습니다.
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun main() = runBlocking<Unit> {
println("Calling simple function...")
val flow = simple()
println("Calling collect...")
flow.collect(::println)
println("Calling collect again...")
flow.collect(::println)
}
// 결과
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
이것이 simple 메서드에 suspend 키워드를 사용할 필요 없는 이유입니다.. simple 메서드 자체는 기다릴 필요 없이 바로 결과가 반환됩니다. collect 함수가 호출될 때마다 flow 가 시작되기 때문에, 위의 코드에서 collect 함수를 다시 호출 했을 때, "Flow started" 가 다시 출력되며 값들이 배출됩니다.
Flow 취소 기본 사항
Flow 는 코루틴의 일반적인 협력형 취소(cooperative cancellation)를 준수합니다. 코루틴의 취소 처리와 같이 flow 콜렉션은 취소가 가능한 일시중단 함수(suspend function - ex. delay) 안에서 flow 가 일시중단 된 상태일 때, 취소 가능합니다. 아래의 예제는 withTimeoutOrNull 블록이 실행 될 떄, 타임 아웃 상태에서 어떤 방식으로 flow 가 취소되는지 보여줍니다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100)
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
withTimeoutOrNull(250) { // 250ms 뒤에 타임 아웃
simple().collect(::println)
}
println("Done")
}
// 결과
Emitting 1
1
Emitting 2
2
Done
flow 에서 2개의 숫자가 배출되었고 이후에 취소가 잘 되었다는 것을 알 수 있습니다. Flow 취소에 대해 좀더 디테일 하게 알고 싶다면 여기를 통해 공부할 수 있습니다.
Flow 빌더
이전 예제들에서 나온 flow {...} 빌더는 가장 기본이 되는 것중 하나입니다. 또한 쉽게 flow 빌더를 만들 수 있는 것들이 있습니다.
- flowOf 는 정해져 있는 값을 배출하는 flow 를 만들 수 있는 빌더입니다.
- 다양한 collections 그리고 sequences 는 .asFlow 라는 확장 함수를 사용하여 쉽게 flow 로 변경될 수 있습니다.
1에서 3 까지 의 숫자들을 flow 로 변환하여 출력하는 예제는 아래와 같이 표현될 수 있습니다.
// Convert an integer range to a flow
(1..3).asFlow().collect(::println)
// 결과
1
2
3
Flow 의 중간 연산자
Flow 는 Collections, Sequences 에서 사용하는 것 처럼 중간 연산자들을 통해 변형될 수 있습니다.
중간 연산자는 업스트림 flow 에 적용 되고, 다운 스트림에서 값을 변환합니다. 또한 flows 처럼 중간 연산자들은 cold 한 성질을 가지고 있습니다. 그리고 중간 연산자의 호출은 일시 중단 되지 않습니다. 빠르게 실행되고, 변형된 flow 로 반환됩니다.
기초가 되는 연산자로는 친근한 이름을 가진 map, filter 가 있습니다. Sequence 와 다른 중요한 점은, 해당 연산자 내부에서는 일시 중단 가능한 함수 들을 호출 가능하다는 점입니다. (WOW)
예를들어 일시 중단 함수로 구현되어진 긴 시간이 걸리는 요청을 map 연산자 안에서 실행해도, 결과를 반환해줍니다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun main() = runBlocking<Unit> {
(1..3).asFlow() // a flow of requests
.map(::performRequest)
.collect(::println)
}
// 결과
response 1
response 2
response 3
변형 연산자(Transform operator)
flow 의 가장 일반적으로 사용되는 연산자는 transform 입니다. map, filter 와 같은 간단한 역할을 하는 변형 연산자도 있지만, 좀 더 복잡한 변형들을 적용할 수도 있습니다. transform 연산자를 사용하면, 임의의 값을 임의의 횟수만큼 배출할 수있습니다.
예를 들어 transform 를 사용하여, 긴 시간의 작업을 비동기로 수행하기전에 문자열을 배출할 수도 있습니다.
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
// main
(1..3).asFlow() // a flow of requests
.transform { request ->
emit("Making request $request")
emit(performRequest(request))
}
.collect { response -> println(response) }
// 결과
Making request 1
response 1
Making request 2
response 2
Making request 3
response 3
크기 제한 연산자
응답 한계에 도달 했을 떄, take 같은 크기 제한 중간 연산자를 사용하여, flow 의 실행을 취소할 수 있습니다. 코루틴에서의 취소는 항상 exception 을 던짐으로써 수행이 됩니다. 그래서 try {...} finally {...} 같은 리소스 관리 함수들은 취소 될 때 활용될 수 있습니다.
fun numbers(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} finally {
println("Finally in numbers")
}
}
fun main() = runBlocking<Unit> {
numbers()
.take(2) // take only the first two
.collect { value -> println(value) }
}
// 결과
1
2
Finally in numbers
위의 코드의 flow{...} 내부에 있는 숫자들은 두번째 숫자가 배출된 후에 정지 되는 것을 명확하게 보여줍니다.
flow 종료 연산자
flow 에서 종료 연산자는 flow 가 수집(collect)을 시작할 때, 일시중단이 가능한 함수 입니다. collect 연산자는 가장 종료 연산자중에 가장 기본이 되는 연산자 입니다. 그리고 다른 종료 연산자들도 많습니다. 그중 쉽게 볼 수 있는 것들을 예시로 보겠습니다.
- toList, toSet 같은 다양한 Collections 으로 변환이 가능합니다.
- 하나의 값을 flow 에서 배출 보장하는 first 연산자도 있습니다.
- fold 나 reduce 같은 연산자를 통해 값을 줄일 수도 있습니다.
예시
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
println(sum)
// 결과
55
flows 는 순차적으로 실행됩니다. (Flow are sequential)
각각의 flow 블록들은 복수 개의 flow 를 사용할 수 있는 특별한 연산자를 사용하지 않는 한 순차적으로 실행됩니다.
코루틴 안에서 종료 연산자를 호출하게 되면 즉시 collections 은 동작합니다. 기본 값으로 새로운 코루틴이 실행되지 않습니다.
배출되어진 각각의 값들은 중간 연산자를 통해 업스트림에서 다운스트림으로 처리되어지고(위에서 부터 아래로 즉 순차적으로), 이후 종료 연산자에게 전달 됩니다.
짝수를 제외시키고, 나머지 숫자를 문자열로 변경하는 아래 예제를 보겠습니다.
(1..5).asFlow()
.filter {
println("Filter $it")
it % 2 == 0
}
.map {
println("Map $it")
"string $it"
}.collect {
println("Collect $it")
}
// 결과
Filter 1
Filter 2
Map 2
Collect string 2
Filter 3
Filter 4
Map 4
Collect string 4
Filter 5
Flow 컨텍스트 (Flow context)
flow 의 collection 은 항상 코루틴이라고 불리는 컨텍스트 안에서만 실행됩니다. 예를 들어 simple 이라는 flow 반환 타입을 가진 합수는 아래 코드의 작성자로 제공된 특별한 컨텍스트 안에서만 동작하게 됩니다.
withContext(context) {
simple().collect { value ->
println(value) // 특별한 컨텍스트 안에서만 실행됩니다
}
}
flow 속성은 기본적으로 제공된 컨텍스트를 유지하여 실행됩니다.
This property of a flow is called context preservation.
그래서 flow {...} 안에 존재하는 코드는 기본적으로 flow 에 상응하는 collector 로 부터 제공된 컨텍스트에서 실행됩니다.
예를 들어 호출 된 스레드에서 3개의 정수를 배출후, 출력하도록 구현된 simple 함수가 있다고 가정해봅시다.
fun simple(): Flow<Int> = flow {
log("Started simple flow")
for (i in 1..3) {
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> log("Collected $value") }
}
// 결과
[main @coroutine#1] Started simple flow
[main @coroutine#1] Collected 1
[main @coroutine#1] Collected 2
[main @coroutine#1] Collected 3
메인 스레드에서 simple().collect 을 호출 했고, 결과적으로 simple 의 내부의 flow 도 메인스레드에서 호출됩니다. 위의 코드는 스레드의 차단이나 컨텍스트의 실행 등을 신경쓰지 않은, 비동기 코드 혹은 빠르게 실행되는 코드의 기본 예제라고 볼 수 있습니다. (언제든 메인스레드가 차단되거나, 오래 걸리는 작업일 경우 문제가 생길수 있는 코드이다 정도로 이해하면 될 것 같습니다.)
WithContext 를 사용한 잘못된 아이템 배출 (Worng emission withContext)
그러나 긴 시간이 걸리는 혹은 CPU 를 소비하는 코드는 메인스레드를 차단할 수 있기 때문에, Dispatcher.Default 에서 실행되어야 합니다. 그리고 UI 를 새고로침 하는 코드는 Dispatcher.Main 에서 호출되어야 합니다. 보통 withContext 에 parameter 로 Context 를 넘겨줘서 사용하게 됩니다. 그러나 flow {...} 빌더 안에 존재하는 코드는 호출하는 쪽의 context 를 존중하여 보존하기 때문에, 다른 Context 로 변경해서 아이템을 방출하는 것을 허락하지 않습니다.
fun simple(): Flow<Int> = flow {
// The WRONG way to change context for CPU-consuming code in flow builder
// flow 자체는 메인스레드에서 호출되었고, 그 안에서 Context 를 변경하는 잘못된 방법
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
emit(i) // emit next value
}
}
}
fun main() = runBlocking<Unit> {
simple().collect { value -> println(value) }
}
// 결과
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@2b3b84d2, BlockingEventLoop@35017f5d],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@79974803, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext (SafeCollector.common.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext (SafeCollector.kt:88)
at kotlinx.coroutines.flow.internal.SafeCollector.emit (SafeCollector.kt:74)
이런 경우는 flowOn 연산자를 사용하면 됩니다.
FlowOn 연산자
위의 결과중 exception 을 잘 읽어보면 flow 에서 아이템을 방출할 때, context 을 변경하고 싶은 경우에는 flow 가 아닌 flowOn 사용하는 것을 언급하고 있습니다. 올바른 방법은 아래의 예제를 보면 알 수 있습니다. 위의 예제들과 마찬가지로 현재 상응하는 스레드의 이름을 출력해보도록 하겠습니다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100) // pretend we are computing it in CPU-consuming way
log("Emitting $i")
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
fun main() = runBlocking<Unit> {
simple().collect { value ->
log("Collected $value")
}
}
// 결과
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 1
[main @coroutine#1] Collected 1
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 2
[main @coroutine#1] Collected 2
[DefaultDispatcher-worker-1 @coroutine#2] Emitting 3
[main @coroutine#1] Collected 3
위의 결과에 주목하면, flow {...} 은 background 에서 실행되는 것을 볼 수 있고, 수집(collection) 될 때는 메인스레드에서 실행되는 것을 볼수 있습니다.
추가적으로 여기서 눈여겨 봐야할 점은, flowOn 을 사용함으로써, flow 의 기본적인 순차 처리가 변경되었다는 점입니다.
일반적인 처리라면 emitting1 -> emitting2 -> emitting3-> collected1->collected2 -> collected3 으로 동작을 기대하겠지만,
지금 수집(collection) 자체는 coroutine#1 에서 진행되었고, emitting 은 coroutine#2 에서 진행되었습니다.
flowOn 연산자는 컨텍스트의 CoroutineDispatcher 가 변경이 될 때, 업스트림 flow 를 위한 새로운 코루틴을 만든다는 점을 알 수 있습니다.
버퍼링(Buffering)
오래 걸리는 작업 등이 관련이 있을 때, 다른 코루틴 안에서 flow 를 다르게 실행하는 것은 flow 가 수집 될 때, 전체적인 시간 관점에서 도움을 줄 수 있습니다. 예를 들어 아이템을 방출하는 flow 반환 타입의 simple 함수가 매우 느려서 100ms 를 주기로 아이템을 생성한다고 가정하고, 수집 하는 부분도 매우 느려 하나의 원소를 처리하는데 300 ms 가 걸린다고 가정해봅시다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
}
// 결과
1
2
3
Collected in 1222 ms
하나의 아이템을 처리하는 시간 총 400 ms : 100 ms -> 아이템 방출 -> 300 ms -> 아이템 수집 (400 ms * 3 = 1200 ms) 만약 우리가 아이템을 방출하는 simple 함수에 buffer 연산자를 사용하게되면, 순차적으로가 아닌, 아이템 방출과 아이템 수집을 동시에 실행할 수 있습니다.
val time = measureTimeMillis {
simple()
.buffer() // buffer emissions, don't wait
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
println("Collected in $time ms")
// 결과
1
2
3
Collected in 1052 ms
위 방식처럼 효율적으로 처리 파이프 라인을 만들경우, 결과적으로 숫자들을 더욱 빠르게 생산할 수 있게 됩니다. 오직 처음 숫자를 위해 100ms 기다리고 각각 숫자를 처리하는데 300 ms 가 소요됩니다. - 숫자를 수집하는 300ms 동안 100 ms 의 작업을 미리 완료하기 때문입니다. 따라서 생산하는 시간보다 이를 수집하고 처리하는 시간이 더욱 오래 걸릴 경우에 사용하는 것이 올바른 방법으로 보입니다.
병합, 합성 (Conflation)
flow 의 연산의 결과나 혹은 연산의 상태를 보여줄 때, 원소 하나하나의 값을 처리하지 않아야 되는 경우가 있을 수도 있습니다.
아이템이 처리되는데 시간이 너무 오래걸려서, 중간 값을 무시하고 수집된 최근 값만 사용하는 경우에는 conflate 연산자를 사용하면 됩니다.
이전 예제를 기반으로 작성하였습니다.
val time = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
println(value)
}
}
// 결과
println("Collected in $time ms")
1
3
Collected in 752 ms
첫 번째 숫자를 보여질 떄, 2번째 연산이 처리되고, 3번째 연산도 처리됩니다. 그래서 두번째는 병합(합쳐지고) 되고, 결과적으로 최근 값이 3이 수집됩니다. 100ms -> 첫 번째 아이템 방출 -> 300ms 처리되어 수집되는 동안 (두번째 아이템 방출(100 ms), 세번째 아이템 방출(100ms)) -> 100ms -> 1 출력 -> 300 ms 처리 -> 3 출력 // 총 100 ms + 300ms + 300ms = 700 ms 걸리게 됩니다.
Android studio 실행시 800 ms 이상이 걸렸다, kotlin playground 에서는 700 ms 이상이 걸렸다. 출력에서 오는 차이가 큰 것으로 생각됩니다.
최근 값 처리하기(Processing the lastes value)
Conflation 은 아이템 발행기와 수집기가 모두 느릴 때, 발행 되는 아이템을 버림으로써 처리를 빠르게 할 수는 방법중 하나 입니다. 다른 방법으로는 새로운 값이 배출되었을 때 느린 수집기를 취소하고, 처음 부터 다시 실행하는 방법도 있습니다. xxx 연산자가 있다면 같은 동작을 하지만 최근 값을 사용하는 xxxLatest 연산자가 존재합니다.
이전 예제에서 conflate 함수를 collectLastest 로 변경해보겠습니다.
val time = measureTimeMillis {
simple()
.collectLatest { value -> // 기존 값을 취소하고 최근 값으로 변경
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
}
// 결과
println("Collected in $time ms")
Collecting 1
Collecting 2
Collecting 3
Done 3
Collected in 692 ms
collectLastest 의 함수 부분은 300ms 가 걸립니다. 그리고 매번 100ms 마다 새로운 값들이 배출됩니다. 결과적으로 마지막 값만 완료 됩니다.
다수의 flows 구성 (Composing multiple flows)
압축 (Zip)
sequence 의 zip 과 유사합니다. flow 는 zip 연산자를 가지고 있고, 2개의 flows 를 결합합니다.
val nums = (1..3).asFlow() // numbers 1..3
val strs = flowOf("one", "two", "three") // strings
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string
.collect { println(it) } // collect and print
// 결과
1 -> one
2 -> two
3 -> three
결합(Combine)
업스트림에서 새로운 값이 배출될 때, 이때 flow 의 최근 값에 의존하여 다시 계산을 수행해야하는 경우가 있습니다.
위의 가장 일치하는 연산자는 combine 이라고 불립니다.
예를 들어, 이전 예제의 숫자가 300ms 마다 업데이트 된다고 가정하고, 문자열도 400ms 마다 업데이트 된다고 하고, 두개의 flow 를 zip 하면 결과는 400ms 마다 출력 되고 위의 예제와 같은 결과를 보여줍니다.
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.zip(strs) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 결과 at xxxx ms from start 를 제외하면 위의 예제와 값이 같다
1 -> one at 429 ms from start
2 -> two at 829 ms from start
3 -> three at 1231 ms from start
onEach 중간 연산자를 사용하여, 각 요소를 지연시키고 flow 코드를 좀더 간결하게 만들어 줍니다.
그리고 zip 사용하지 않고, combine 중간 연산자를 사용하면 결과는 아래와 같습니다.
val nums = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val strs = flowOf("one", "two", "three").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
nums.combine(strs) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 결과
1 -> one at 441 ms from start
2 -> one at 641 ms from start
2 -> two at 842 ms from start
3 -> two at 942 ms from start
3 -> three at 1243 ms from start
nums 또는 strs 에서 아이템이 배출될 때마다, 새로운 결과가 나타납니다.
Flows 평탄화 (Flattening flows)
flow 는 비동기적으로 받아진 순차적인 값을 나타냅니다. 그래서 각각의 값이 다른 순차적인 값을 요청하는 상황을 쉽게 만들 수 있습니다. 예를 들어 500 ms 간격으로 문자열을 반환하는 함수를 만들 수도 있습니다.
fun requestFlow(i: Int): Flow<String> = flow {
emit("$i: First") // 첫번째 아이템 방출
delay(500) // wait 500 ms 어떤 작업을 완료한 뒤
emit("$i: Second") // 두번째 아이템 방출
}
또한 3개의 숫자가 있고 각각 숫자가 requestFlow 함수를 호출하도록 만들 수도 있습니다.
(1..3).asFlow().map { requestFlow(it) }
위의 코드의 반환 타입은 Flow<Flow<String>> 타입이기 때문에, Flow<String> 으로 만들기 위해서는 추가적으로 평탄화(flattened) 되어야 합니다. Collections 과 Sequence 는 flatten, flatMap 연산자 같은 평탄화 메서드를 제공하지만. 일반적으로 flow 는 비동기적으로 처리되기 때문에, 다른 이름으로 불리는 평탄화 메서드(flatMapConcat, flatMapMerge, flatMapLatest)를 제공합니다.
flatMapConcat (보통 Contcat 이 들어가면 순서를 보장합니다 -Rx concat 도 순서를 보장합니다)
flow 끼리의 연결(Concatenating) 은 flatMapConcat, flattenConcat 연산자를 통해서 이뤄지게 됩니다. 그리고 앞에서 말한 2개의 연산자는 순차 연산자에 와 같게 처리됩니다(순서대로 처리 된다는 의미).
아래 예제를 보면 다음 값이 수집(collect)을 시작하기 전에, 완료하기 위해서 내부 flow 에서 대기합니다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 결과
1: First at 129 ms from start
1: Second at 630 ms from start
2: First at 730 ms from start
2: Second at 1230 ms from start
3: First at 1331 ms from start
3: Second at 1831 ms from start
FlatMapMerge
평탄화의 또 다른 방법은 모든 들어오는 flow 를 동시에 수집(collect) 하고, 하나의 flow 로 합쳐서 가능한 빨리 값을 배출시키는 방법입니다. 해당 방법은 flatMapMerge, flattenMerge 연산자를 통해 구현되어집니다. 두 연산자 모두 동시에 수집 될 수 있는 flow 의 갯수를 제한하는 동시성 파라미터를 선택적으로 허용합니다. 기본 값으로는 DEFAULT_CONCURRENCY 파라미터가 들어가있고, 16개 까지 갯수를 제한합니다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 결과
1: First at 160 ms from start
2: First at 256 ms from start
3: First at 357 ms from start
1: Second at 661 ms from start
2: Second at 756 ms from start
3: Second at 858 ms from start
flatMapMerge 함수 안에서 requestFlow 함수를 호출하고 있어 순차적으로 호출되지만, 결과를 수집 할 때는 동시에 진행되고 있다. 이는 map { requestFlow(it) } 을 호출한뒤에 flattenMerge 를 호출하는 것과 결과가 같습니다.
flatMapMerge = Map + FlattenMerge
flatMapLatest
collectLatest 와 유사한 방식으로 가장 최신의 값을 보여줍니다. 가장 최신의 평탄화 라는 수식어에 알맞게 새로운 flow 가 배출되면, 이전의 flow 의 수집(collect)이 취소가 됩니다.
val startTime = System.currentTimeMillis() // remember the start time
(1..3).asFlow().onEach { delay(100) } // a number every 100 ms
.flatMapLatest { requestFlow(it) }
.collect { value -> // collect and print
println("$value at ${System.currentTimeMillis() - startTime} ms from start")
}
// 결과
1: First at 151 ms from start
2: First at 287 ms from start
3: First at 388 ms from start
3: Second at 888 ms from start
flatMapLastest 함수는 위의 예제에서 새로운 값이 있는 경우 { requestFlow(it) } 블록 내의 코드가 취소 됩니다. 만약 requestFlow 함수 자체가 매우 빠르고, 일시중단 할수 있는 함수가 아니라면 취소 가능하지 않기 때문에, 다른 예제들과 차이가 없다. 그러나 delay 같은 일시중단 가능한 함수가 같이 사용되면 차이가 나타나게 됩니다.
delay 함수를 제거하면 생각하면 인터럽트 없이 로직이 모두 실행됩니다
1: First at 184 ms from start
1: Second at 184 ms from start
2: First at 287 ms from start
2: Second at 287 ms from start
3: First at 391 ms from start
3: Second at 391 ms from start
Flow 예외처리(Flow exceptions)
Flow 의 collection 은 emitter 또는 코드 내부에서 exception 을 던지면, 에러와 함께 완료될 수 있습니다.
몇개의 에러를 다룰 수 있는 방법을 보겠습니다.
Collector try and catch
Collector 는 코틀린의 try/catch 블록을 사용하여 에러를 처리할 수 있습니다.
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value ->
println(value)
check(value <= 1) { "Collected $value" }
}
} catch (e: Throwable) {
println("Caught $e")
}
}
// 결과
Emitting 1
1
Emitting 2
2
Caught java.lang.IllegalStateException: Collected 2
위의 예제는 collect 라는 종료 연산자에서 발생하는 에러를 성공적으로 잡아냅니다. 그렇기 떄문에 이후에 배출되는 값은 없습니다.
모든 에러 잡기 (Everything is caught)
이전에 예제에서는 종료 연산자 또는 중간 연산자 또는 emitter 안에서 일어나는 exception 을 처리했었습니다.
예를 들어 배출된 값이 map 함수를 만나는 도중에 exception 을 만들어내면 어떨까요 ?
fun simple(): Flow<String> =
flow {
for (i in 1..3) {
println("Emitting $i")
emit(i) // emit next value
}
}
.map { value ->
check(value <= 1) { "Crashed on $value" }
"string $value"
}
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} catch (e: Throwable) {
println("Caught $e")
}
}
// 결과
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
여전히 exception 이 잘잡히는 것을 볼 수 있습니다
보기 좋은 에러 처리(Exeption transparency - 에러 투명성)
그러면 에러를 처리하는 행동을 try-catch 가 아닌 캡슐화 하는 방법은 없을까요 ?
flow 는 에러 처리에 대해 보기 좋게(transparency) 가져 합니다. try/catch block 안에 flow {...} 빌더를 넣어두는 것은 에러 보기 좋게 만드는 방법이 아닙니다.
이전 예제에서 try/catch 처리 하는 것 처럼 collector 도 에러를 잡을 수 있습니다.
emitter 에서 catch 연산자를 사용하여, 에러를 보기 좋게 처리 할 수 있습니다. 그리고 에러 처리를 캡슐화 하는 것을 가능하게 합니다.
catch 연산자의 body 부분은 에러를 분석할 수 있고, 잡혀진 에러에 따라 다른 방법으로 대응할 수 있습니다.
- throw 를 사용하여 exceptions 을 다시 던질 수 있습니다
- catch 연산자의 body 에서 emit 함수를 사용하여 에러를 전달 할 수 있습니다.
- 에러는 무시될 수 있고, 로깅 될 수 있고 , 다른 코드로 부터 처리 될 수도 있습니다.
예를 들어, 에러를 catching 하는 문자열을 방출해봅시다.
simple()
.catch { e -> emit("Caught $e") } // emit on exception
.collect { value -> println(value) }
// 결과
Emitting 1
string 1
Emitting 2
Caught java.lang.IllegalStateException: Crashed on 2
이전과 같은 출력을 가지는 예제이지만, 더이상의 try/catch 로 감싸는 일은 필요하지 않습니다.
보기 좋은 에러 처리(Transparent catch)
에러 투명성을 존중하는 catch 중간 연산자는 오직 업스트림 exception 만 처리합니다. (catch 연산자 위에 사용되는 연산자로부터는 에러 처리는 가능하지만, 그 아래는 처리되지 않습니다.)
fun simple(): Flow<Int> = flow {
for (i in 1..3) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
simple()
.catch { e -> println("Caught $e") } // does not catch downstream exceptions
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
// 결과
Emitting 1
1
Emitting 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:133)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$$inlined$collect$1.emit (Collect.kt:134)
at kotlinx.coroutines.flow.internal.SafeCollectorKt$emitFun$1.invoke (SafeCollector.kt:15)
catch 연산자가 있음에도 불구하고, 에러는 처리되지 않았습니다.check 함수가 catch 위에 있어야 정상적으로 처리가능합니다.
Catching declaratively(명시적으로 캐치하기)
collect 함수 내부 내용을 onEach 내부로 이동하고, catch 연산자 앞에 특정 에러를 명시적으로 캐치할 수 있도록 처리할 수도 있습니다.
이경우에는 collect 함수에는 아무런 파라미터 전달 없이 실행되어야 합니다.
simple()
.onEach { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
.catch { e -> println("Caught $e") }
.collect()
// 결과
Emitting 1
1
Emitting 2
Caught java.lang.IllegalStateException: Collected 2
Flow의 완료 (Flow completion)
flow 의 수집이 완료될 때(일반적인 상황이든 예외적인 상황이든), 추가적인 작업이 필요할 수도 있습니다. 이미 알고 계시겠지만(위에서 이미 try-catch 와 catch 연산자를 경험했으니까), 명령형 또는 선언적 2가지의 방법으로 모두 가능합니다.
명령형(Imperative finally block)
try/catch 뿐만 아니라 collect 완료시에 finally 블록을 사용하여 다른 액션을 처리할 수도 있습니다.
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
try {
simple().collect { value -> println(value) }
} finally {
println("Done")
}
}
// 결과
1
2
3
Done
선언형 처리(Declarative handling)
선언형의 방법으로는 flow 에 onCompletion 중간 연산자를 사용하면 됩니다. flow 가 완전하게 수집된 후에 호출됩니다.
이전 예제를 onCompletion 을 사용해서 그대로 써봅시다.
simple()
.onCompletion { println("Done") }
.collect { value -> println(value) }
// 결과
1
2
3
Done
역시나 출력은 같습니다.
onCompletion 방법을 사용하면 추가적인 이점이 있습니다.. flow 의 collection 이 정상적 혹은 예외적으로 완료 되었는지 구분하여 사용할 수 있는 Throwable(Nullable) 파라미터를 제공합니다. 아래의 예제를 보면 1 숫자가 배출된 후, exception 을 던지는 simple 함수가 있습니다.
fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> if (cause != null) println("Flow completed exceptionally") }
.catch { cause -> println("Caught exception") }
.collect { value -> println(value) }
}
// 결과
1
Flow completed exceptionally
Caught exception
onCompletion 연산자는 catch 와 다르기 때문에 exception 을 처리하지 못합니다. 위의 예제 코드를 보면 exception 은 여전히 flow 의 다운스트림으로 향하고 있습니다. onCompletion 에 전달은 되지만 처리는 catch 연산자에서만 할 수 있습니다.
성공적인 완료 (Successful completion)
onCompletion 이 catch 와의 또 다른점은 모든 에러를 확인하고, 실패나 취소 없이 업스트림의 flow 가 성공적으로 완료 되는 경우에만 null exception 을 받을 수 있습니다.
fun simple(): Flow<Int> = (1..3).asFlow()
fun main() = runBlocking<Unit> {
simple()
.onCompletion { cause -> println("Flow completed with $cause") }
.collect { value ->
check(value <= 1) { "Collected $value" }
println(value)
}
}
// 결과
1
Flow completed with java.lang.IllegalStateException: Collected 2
Exception in thread "main" java.lang.IllegalStateException: Collected 2
at FileKt$main$1$invokeSuspend$$inlined$collect$1.emit (Collect.kt:133)
at kotlinx.coroutines.flow.FlowKt__BuildersKt$asFlow$$inlined$unsafeFlow$9.collect (SafeCollector.common.kt:115)
at kotlinx.coroutines.flow.FlowKt__EmittersKt$onCompletion$$inlined$unsafeFlow$1.collect (SafeCollector.common.kt:114)
onCompletion 이 null 이 아니라는 것을 볼 수 있는데, 다운 스트림에서 발생한 에러 때문에 중단 되었기 때문입니다.
명령형 vs 선언형 (Imperative versus declarative)
위의 예제들로 부터 우리는 명령형 선언형으로 flow 를 처리하는 방법에 대해 배워보았다. 그렇다면 어떤 것이 더 좋은 표현일까 ?
해답은 특정 접근 방식을 선호하지 않고, 두 옵션이 모두 적합하며 코드 스타일이나 선호도에 따라 골라서 사용하면 됩니다.
Launching flow
flow 를 사용하여 비동기 이벤트를 작성하는 것은 쉽습니다. 들어오는 이벤트에 대한 액션으로 코드를 추가하고, 추가적인 작업이 필요한 경우 addEventListener 같은 함수를 코드의 한 부분에 등록해야 합니다. onEach 연산자는 이런 역할(addEventListener)을 해주고 있습니다. 그러나 onEach 는 중간 연산자 이기 때문에 결과적으로 실행하기 위해서는 collect 같은 종료 연산자가 필요합니다. 마찬가지로 onEach 만 호출하는 경우는 아무런 효과가 없습니다.
만약 onEach 이후에 collect 종료 연산자를 사용하면 아래처럼 flow 가 수집될 때까지 대기합니다.
// Imitate a flow of events
fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.collect() // <--- flow 를 수집하는 동안 대기한다
println("Done")
}
// 결과
Event: 1
Event: 2
Event: 3
Done
launchIn 을 종료 연산자를 사용하여 분리된 코루틴에서 flow 가 수집(collection) 될 수 있도록 할 수 있습니다. 이 경우에 그 아래 코드들은 즉시 실행됩니다.
fun main() = runBlocking<Unit> {
events()
.onEach { event -> println("Event: $event") }
.launchIn(this) // <--- Launching the flow in a separate coroutine
println("Done")
}
// 결과
Done
Event: 1
Event: 2
Event: 3
launchIn 을 사용해 flow 가 수집을 시작하게 하기 위해서는 CoroutineScope 를 지정해줘야합니다. 위의 예제에서는 runBlocking 코루틴 빌더로 부터 전달되는 스코프를 사용했습니다.(내부에서 EmptyCoroutineContext 가 들어가고 있습니다.) flow 가 실행되는 동안 runBlocking 스코프는 다른 자식 코루틴의 완료를 기다리고, main 함수가 값을 반환하고 종료하지 않도록 유지시켜줍니다.
실제 어플리케이션은 제한된 수명(lifetime)을 가지고 있습니다. 그리고 수명(lifetime) 이 종료되면 가능한한 빠르게 그에 상응하는 스코프를 취소하고 flow 의 수집은 취소 됩니다. (lifetime 종료 -> 스코프 내의 취소가능한 것들이 취소 -> flow 의 수집도 취소)
onEach {...}.launchIn(Scope) 처럼 Pair 로 사용하게 되면 addEventListener 처럼 동작합니다. 그러나 removeEventListener 같은 함수는 필요 없습니다. (수명이 종료되면 자동으로 취소 되기 때문입니다.)
launchIn 은 작업이 끝나거나, 전체 스코프를 취소하지 않고, 수집에 해당되는 flow 만 취소할 수 있는, job 을 반환합니다.
Flow 취소 검사(Flow cancellation checks)
편의를 위해 flow 빌더 는 추가적으로 배출되어진 각각의 값들에 대해 취소가 되었는지 ensureActive 검사를 수행합니다. 이것의 의미는 매우 바쁘게 아이템을 방출하고 있는 flow 도 취소가 될 수 있습니다.
fun foo(): Flow<Int> = flow {
for (i in 1..5) {
println("Emitting $i")
emit(i)
}
}
fun main() = runBlocking<Unit> {
foo().collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 결과
Emitting 1
1
Emitting 2
2
Emitting 3
3
Emitting 4
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1579)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215)
4를 방출하고, CancellationException 이 발생했습니다.
그러나 다른 대부분의 flow 연산자는 성능의 이유로 추가적인 취소 확인 작업을 하지 않습니다. 예를 들어 IntRange.asFlow 를 사용하여 위와 동일한 루프를 만들고 어디에서도 일시중단 하지 않는다면 취소를 확인 하는 작업은 없습니다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 결과
1
2
3
4
5
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1579)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215)
1~5 까지의 모든 숫자들이 수집되어 집니다. 그리고 runBlocking 으로 부터 반환되기 전에 취소가 발견됩니다.
취소 가능한 바쁜 상태의 flow 만들기 (Macking busy flow cancellable)
바쁜 상태의 코루틴에서 취소를 명시적으로 검사해야 합니다. .onEach { currentCoroutineContext().ensureActive() } 를 사용해서 처리할 수지만, 이미 cancellable 연산자가 준비되어 있습니다.
fun main() = runBlocking<Unit> {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
println(value)
}
}
or
fun main() = runBlocking<Unit> {
(1..5).asFlow().onEach{ currentCoroutineContext.ensureActive() }.collect { value ->
if (value == 3) cancel()
println(value)
}
}
// 결과
1
2
3
Exception in thread "main" kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled
at kotlinx.coroutines.JobSupport.cancel (JobSupport.kt:1579)
at kotlinx.coroutines.CoroutineScopeKt.cancel (CoroutineScope.kt:217)
at kotlinx.coroutines.CoroutineScopeKt.cancel$default (CoroutineScope.kt:215)
Flow and Reactive Streams
Reactive Streams 혹은 RxJava 나 Project Reactor 같은 Reactive Framework 에 친근한 사람들은 Flow 의 디자인이 매우 친근할 것 입니다.
실제로도 flow 의 디자인은 Reactive Streams 그리고 다양한 구현체들에게서 영감을 받았습니다. 그러나 Flow 의 주된 목표는 코틀린과 일시중단에 친숙하며 구조화된 동시성 기능을 가능한 간단하게 만드는 것입니다. 목표를 달성하기 위해, Reactive 선구자들의 엄청난 노력없이는 불가능할 것 입니다. 자세한 스토리는 여기서 읽어 볼 수 있습니다
개념적으로는 다르지만 Rx <-> flow 는 변환 될 수 있습니다. 이러한 변환기는 kotlinx-coroutines-reactive, kotlinx-coroutines-rx2 같은 라이브러리에서 제공됩니다.
'Android > 번역' 카테고리의 다른 글
LiveData, Coroutine, Flow 를 이용한 반응형 UI - Part 3 (0) | 2021.06.07 |
---|---|
LiveData, Coroutine, Flow 를 이용한 반응형 UI - Part 2 (0) | 2021.06.06 |
LiveData, Coroutine, Flow 를 이용한 반응형 UI - Part 1 (1) | 2021.05.25 |
코루틴을 뷰에 적용하기 (1) | 2021.03.14 |
코틀린으로 알아보는 SOLID 원칙 (2) | 2021.03.03 |