Kotlin Coroutines 기본적인 개념과 docs

kotlinlang.org/docs/coroutines-guide.html#table-of-contents

  • 참고 ) Kotlin Coroutine은 어떻게 동작하는가 - CPS 관련 설명, 자주 하는 질문까지 포함한 좋은 발표 자료 
  • 참고 ) 어떠한 코루틴이 발동될 때 마다 해당 코루틴은 이전에 자신의 실행이 마지막으로 중단되었던 지점 다음의 장소에서 실행을 재개한다.
    • Kotlin의 Coroutine은 suspend 키워드로 마킹된 함수를 CPS(Continuation Passing Style)로 변환하고, 이를 Coroutine Builder를 통해 적절한 스레드 상에서 시나리오에 따라 동작하도록 구성됩니다.
    • 주의해야 할 점은 suspend function은 스레드와 스케쥴의 관리를 수행하는 것이 아니라 비동기 실행을 위한 중단(suspension) 지점의 정의라는 점입니다. 코루틴은 중단 지점까지 비선점형으로 동작하기 때문에 실행 스케쥴이 OS에 의해 온전히 제어되는 스레드와는 다른 관점에서 보아야 합니다.
    • 스레드는 잘 실행하다가 갑자기 OS가 나오라면 제어권을 양도하고 비켜야 하는데, 코루틴은 중단 지점을 만나지 않는 한 제어권을 양도하지 않고 이어서 계속 실행함

 

https://kotlinlang.org/docs/coroutines-basics.html

  • A coroutine is an instance of suspendable computation. It is conceptually similar to a thread, in the sense that it takes a block of code to run that works concurrently with the rest of the code. However, a coroutine is not bound to any particular thread. It may suspend its execution in one thread and resume in another one.
  • A coroutine scope does not complete until all launched children coroutines complete.
    • 코루틴 스코프를 만들어주는 함수
      • `` runBlocking`` : to sync world. blocking
      • `` coroutineScope`` : suspend 함수. async 용
    • 어떤 스코프에서 진행하다가 하위스코프를 만나면, 해당 하위스코프가 완전히 끝나야 그 다음 라인 실행 가능.
      • 하위 스코프를 기다려야 하므로 runBlocking은 내부 코루틴이 모두 끝나기를 기다렸다가 종료됨.
      • 반면 어떤 작업을 던지고 함수는 종료하고 싶다면, ``kt GlobalScope.launch`` 사용한다.
      • 또는, Dispatchers.IO 등 지정한 Dispatcher에서 실행할 수 있도록 해준다.
  • coroutine builder : 하위 스코프를 만들지는 않지만 코루틴 스코프 내에서 비동기적으로 작업을 수행할 때 사용함. 반환값을 이용해 완료 대기 제어 가능.
    • `` launch`` : 비동기 실행 결과가 필요 없는 경우
      • 리턴값 `` Job`` 대기는 `` join()`` 다중 대기는 `` joinAll()``
      • withContext 로 대체할 수 있는 경우
    • `` async`` : 비동기 실행 결과가 외부에서 필요한 경우
      • 리턴값 `` Deffered`` (Deffered는 Future, Promise와 같은 개념이다.)
      • 대기는 `` await()`` 다중 대기는 `` awaitAll()``
      • async 블럭을 만나자 마자 해당 코루틴을 비동기로 실행해버리는데, 원하는 시점에 실행하려면 start = LAZY 주고 나중에 start() 호출하면 된다.
    • 이런 코루틴 빌더로 작업을 감싸지 않고 그냥 사용하는 경우? 해당 코루틴은 순차적으로 한 라인 씩 대기하면서 실행한다.

```kt

val time = measureTimeMillis {

    delay(1000L)

    delay(1000L)

    println("Done")

}

println("Completed in $time ms")  // 2000 ms가 걸린다.

```

```kt

val time = measureTimeMillis {

    coroutineScope {

        launch {

            delay(1000L)

            println("launch 1")

        }

        launch { 

            delay(1000L) 

            println("launch 2")

        }

        println("Done")

    }

}

println("Completed in $time ms")  // 1000 ms

// 이렇게 launch나 async로 감싸줘야 각각의 작업이 새 코루틴이 되어 비동기적으로 실행된다.

```

 

  • 아래 예제를 보면 위에서부터 순차적으로 실행하되,
  • async 블럭을 지나면서 비동기적으로 실행 & 첫 번째 delay를 만나면 이를 실행
  • 1초가 지난 후 async 결과 비동기적 반환 & 첫 번째 delay 풀리고 그 다음 라인 진행
  • 두 번째 delay에서 또 대기.
  • await 시점에는 이미 async 결과가 준비 된 상태이므로 바로 결과 반환

```kt

runBlocking {

    val time = measureTimeMillis {

        log().info("Start")

        val r1 = async {

            delay(1000L)

            log().info("async 1")

            14

        }

        val r2 = async {

            delay(1000L)

            log().info("async 2")

            32

        }

        log().info("print")

        delay(1000L)

        log().info("print")

        delay(1000L)

        log().info("print")

        log().info("Start ${r1.await()}")

        log().info("Done ${r2.await()}")

    }

    println("Completed in $time ms")

}

```

```log

0s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - Start

0s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - print

1s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - print

1s [main @coroutine#2] INFO kotlinx.coroutines.CoroutineScope - async 1

1s [main @coroutine#3] INFO kotlinx.coroutines.CoroutineScope - async 2

2s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - print

2s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - Start 14

2s [main @coroutine#1] INFO kotlinx.coroutines.CoroutineScope - Done 32

Completed in 2088 ms

```

 

 

https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/01_Introduction

  • 대체로 kotlinlang.org/docs 와 중복되는 내용이므로 같이 보면 좋다.
    • 3. Using callbacks 부분 예제는 너무 억지로 이상하게 만든 느낌이 강하므로 볼 필요 없음
    • 5. Concurrency 등 코루틴 관련 부분
    • 8. Channels 그림 자료가 아주 좋다.
  • 병렬 IO를 위해서는 멀티스레드를 사용하거나, 이벤트루프 & 큐를 사용하는 방법 뿐이었는데 코루틴(CPS)을 이용하면 단일 스레드 하에서 병렬 작업이 가능하다.
    • 코루틴이 스레드를 대체할 수 있나?
      • 코루틴은 스레드 위에서 돌아가는 것이지만, 멀티스레드 대체 관점에서는 부분적으로는 가능. (1스레드 위의 n개 코루틴)
      • IO bound 작업은 가능한데(단일 스레드에서도 소화 가능한 수준이라면). CPU bound 작업은 코루틴이 돌아가는 기반 스레드풀 수를 늘려주어야 한다.
    • 다중 스레드에서 환경에서 코루틴은?
      • 코루틴의 A부분은 1번 스레드에서 실행 --- suspend 후 재개 --- B부분은 2번 스레드에서 실행
      • 이건 Dispatcher에 따라 다르다 (unconfined vs confined)
    • `` Dispatchers.Default`` 스레드풀은 CPU core 수와 동일하게 구성되며, core가 1개인 경우는 2개로 구성된다.

 

library 등 blocking function을 사용해야 하는 경우, 어떻게 coroutine으로 wrapping 해야 하나?

  • library function 내부에서 blocking call을 한다거나, Thread.sleep(1) 하면서 polling 하는 경우, 어떻게 처리해야 하나?
  • 우선은, library의 polling processor를 추상화한 인터페이스가 있는지 보고, 있다면 Coroutine으로 구현한 processor를 사용하도록 하는게 어떤지 검토해보고. (i.e., caver)
  • https://stackoverflow.com/questions/66412090/proper-way-of-dealing-with-blocking-code-using-kotling-coroutines  
  • 요약, CPU bound 작업이면 Dispatchers.Default, IO bound 작업이면 Dispatchers.IO에 던진다.

 

https://kotlinlang.org/docs/flow.html

  • A suspending function asynchronously returns a single value, but how can we return multiple asynchronously computed values? => flow를 사용한다

 

https://kotlinlang.org/docs/shared-mutable-state-and-concurrency.html

 

Rx (Mono, Flux) 와 Coroutine 의 비교

Rx 사용하면서 느꼈던 단점은 아래와 같다.

  • then then then 연결해야 한다는 점. 이렇게 Mono들을 체이닝 해야 하나의 Mono가 되고, subscribe or block 했을 때 연쇄적으로 실행되어 결과를 가져올 수 있으니까.
    • 근데 이렇게 체이닝 안의 람다로 코드 컨텍스트가 제한되다 보니,
    • Mono1.then(Mono2).then(Mono3) 상황에서, Mono3에서 Mono1의 결과를 사용하고 싶어도 전달받을 방법이 애매하다는 문제가 있음.
  • sync world와 통합하기 지저분하다는 점.
    • 반환값이 필요하지 않은 경우 그냥 subscribe() 해버리면 되니까 문제가 되지 않는데... 반환값이 필요해 block() 해야 하면?
    • e.g., 비동기 task들을 하나 block하고 다음꺼 block하고 하지 않고, 한 번에 이벤트 루프에 넘겨 resolve 하려면? Mono.zip 써야 함. 게다가 반환값은 Tuple 이다.
  • 조건에 따라 반복할 때, repeatWhen, onlyIf 사용이 애매함. (가능은 할 것 같은데 자료가 별로 없다)
  • 결과매핑객체를 로깅하고 싶을 때, Mono는 doOnSuccess 등에서 로깅하는 것이 괜찮은데, Flux는 doOnComplete를 사용한다 해도 Flux<T>가 List<T>가 될지 Map<T,R>가 될지 모르기 때문에 로깅도 불가.
  • 메서드 반환값이 Mono, Flux이면 @Cacheable 사용해도 의미가 없고, 대신 Mono.cache() 사용해야 한다. (** 하지만 어차피 suspend 함수도 @Cacheable이 제대로 지원되는 것은 아니라 코루틴 대비 단점이라 보기는 어렵다)

 

반면 코루틴은 위와 같은 문제점이 없음. 신경 쓸 것도 덜하고... 자연스럽다. bad smell이 느껴지는 부분이 적다.

전반적으로 메서드 체이닝과 람다로 강제되는 Rx 보다 유연한 구조라고 생각됨.

 

Kotlin Coroutines With Spring WebFlux

 

개인 정리

  • Mono가 익숙하다면 ``kt awaitBody<T>()`` 보다는 ``kt bodyToMono(T::class).awaitSingle()`` 을 사용할 것
    • 어차피 awaitBody가 저걸 묶어놓은건데, bodyToMono를 사용하면 중간에 Mono 관련 메서드 체이닝이 가능하다. (e.g., retryWhen)
  • 반면 Mono가 익숙하지 않다면 바로 awaitBody해서 Mono 인터페이스를 아예 지우고 coroutine으로만 다루는 것이 덜 헷갈린다.
  • 시스템 전체가 Coroutine으로 되어있는게 아니라면, WebClient 등 Mono를 만드는 layer에서는 그냥 Mono를 리턴하고, 그를 소비하는 쪽에서 [`` subscribe(), awaitSingle(), block()``] 중 골라 쓰도록 하는 것이 확장성 면에서 더 나아 보인다.