Coroutine 笔记
2024-04-10 01:40:32  阅读数 225

一.网络请求

1、网络请求配置

获取 github 贡献者的列表

  /**
     *  github 账号的设置
     *  登录账号、token 、组织
     *  获取token的地址 https://github.com/settings/tokens/new
     */
    private val req = RequestData("zhi*******@163.com","***************", "kotlin")
2、线程 execute 阻塞
  fun loadContributorsBlocking(): List<User> {
        val repos = service.getOrgReposCall(req.org).execute().body() ?: emptyList()

        return repos.flatMap {
            service.getRepoContributorsCall(req.org, it.name).execute().body() ?: emptyList()
        }
    }
3、异步 enqueue 回调
  fun loadContributorsCallbacks(userLoadData: IUserLoadData) {
        service.getOrgReposCall(req.org).enqueue(NetCallback {
            val listRepos: List<Repo> = it ?: emptyList()
            if (listRepos.isEmpty()) userLoadData.onUserLoad(emptyList())

            val allUsers: MutableList<User> = ArrayList()

            val numberOfProcessed = AtomicInteger() //多线程的同步处理
            //val countDownLatch = CountDownLatch(listRepos.size)

            for (repo in listRepos) {
                service.getRepoContributorsCall(req.org, repo.name).enqueue(NetCallback { list ->

                    if (!list.isNullOrEmpty()) allUsers.addAll(list)

                    if (numberOfProcessed.incrementAndGet() == listRepos.size) {
                        userLoadData.onUserLoad(allUsers.aggregate())
                    }
                })
            }
            //countDownLatch.await()  //导致线程阻塞
            // userLoadData.onUserLoad(allUsers.aggregate())
        })
    }
4、协成加载
  suspend fun loadContributorsSuspend(userLoadData: IUserLoadData) {
        val repos = service.getOrgRepos(req.org).body() ?: emptyList()

        val users = repos.flatMap { repo ->
            service.getRepoContributors(req.org, repo.name).body() ?: emptyList()
        }.aggregate()

        userLoadData.onUserLoad(users)
  }
5、协成并行
 suspend fun loadContributorsConcurrent(userLoadData: IUserLoadData) {
        val repos = service.getOrgRepos(req.org).body() ?: emptyList()

        val deferred = repos.map { repo ->
            CoroutineScope(Dispatchers.IO).async {
                log(repo.name, coroutineContext)
                service.getRepoContributors(req.org, repo.name).body() ?: emptyList()
            }
        }

        val list = deferred.awaitAll().flatten().aggregate()

        userLoadData.onUserLoad(list)

    }

二. 协成的取消

1、线程的取消
override fun onJobCancel() = onClick {
    log("main: I'm start!")
    val job = mainScope.launch {
        repeat(1000) { i ->
            log("job: I'm sleeping $i ...")
            delay(500L)
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!")
    job.cancel() // cancels the job
    log("main: Now I can quit.")
}
2、线程取消不起作用,需要执行完成
override fun onCancelComplete() = onClick {
    val startTime = System.currentTimeMillis()
    val job = mainScope.launch(Dispatchers.Default) {
        var nextPrintTime = startTime
        var i = 0
        while (i < 10) { // computation loop, just wastes CPU
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                log("job: I'm sleeping ${i++} ...")
                nextPrintTime += 500L
            }
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    log("main: Now I can quit.")
}
3、捕获取消协成的异常
override fun onCancelException() = onClick {
    val job = mainScope.launch {
        repeat(5) { i ->
            try { // print a message twice a second
                log("job: I'm sleeping $i ...")
                delay(500)
            } catch (e: Exception) {
                log("job Exception: $e")
            }
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    log("main: Now I can quit.")
}
4、通过 isActive 取消协成
override fun onActiveCancel() = onClick {
    val startTime = System.currentTimeMillis()
    val  job = mainScope.launch(Dispatchers.Default) { //(1)
    //(2) val  job = mainScope.launch {
        var nextPrintTime = startTime
        var i = 0
        while (isActive) { // cancellable computation loop
            // print a message twice a second
            if (System.currentTimeMillis() >= nextPrintTime) {
                log("job: I'm sleeping ${i++} ...",coroutineContext)
                nextPrintTime += 500L
                //(3) delay(500)
            }
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!", currentCoroutineContext())
    job.cancelAndJoin() // cancels the job and waits for its completion
    log("main: Now I can quit.", currentCoroutineContext())
}

注解

  • 如果注释 (1) 打开 (2) 导致线程一直被占用, while 循环下面代码不被执行
  • 如果注释 (1) 打开 (2) 同时 打开 (3) 释放线程,才能执行 while 循环下面代码
5、通过 finally 取消后的操作
override fun onFinallyCancel() = onClick {
    val job = mainScope.launch {
        try {
            repeat(1000) { i ->
                log("job: I'm sleeping $i ...")
                delay(500L)
            }
        } catch (e: Exception) {
            log("Exception:  : $e")
        } finally {
            log("job: I'm running finally")
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    log("main: Now I can quit.")
}
6、通过 withContext(NonCancellable) 设置不可取消,用于释放资源、停止服务等
override fun onNoCancel() = onClick {
    val job = mainScope.launch {
        try {
            repeat(1000) { i ->
                log("job: I'm sleeping $i ...")
                delay(500L)
            }
        } finally {
            withContext(NonCancellable) {
                log("job: I'm running finally")
                delay(1000L)
                log("job: And I've just delayed for 1 sec because I'm non-cancellable")
            }
        }
    }
    delay(1300L) // delay a bit
    log("main: I'm tired of waiting!")
    job.cancelAndJoin() // cancels the job and waits for its completion
    log("main: Now I can quit.")
}
7、通过 withTimeout 设置超时
override fun onTimeOut() = onClick {
    withTimeout(1300L) {
        repeat(1000) { i ->
            log("I'm sleeping $i ...")
            delay(500L)
        }
    }
}
8、捕获超时异常
override fun onTimeOutCatch() = onClick {
    try {
        withTimeout(1300L) {
            repeat(1000) { i ->
                log("I'm sleeping $i ...")
                delay(500L)
            }
        }
    } catch (e: Exception) {
        log("Exception $e")
    }
}
9、withTimeoutOrNull 超时或返回 null
override fun onTimeOutCatchOrNull() = onClick {
    val result = withTimeoutOrNull(1300L) {
        repeat(1000) { i ->
            log("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }

    log("Result is $result")

    val result1 = withTimeoutOrNull(13000L) {
        repeat(5) { i ->
            log("I'm sleeping $i ...")
            delay(500L)
        }
        "Done" // will get cancelled before it produces this result
    }
    log("Result is $result1")
}

三. Suspend function 介绍

1、串行执行
override fun onSequentialDefault() = onClick {
    val time = measureTimeMillis {
        val one = doSomethingUsefulOne()
        val two = doSomethingUsefulTwo()
        log("The answer is ${one + two}")
    }
    log("Completed in $time ms")
}
2、并行执行
override fun onAsyncOperation()  {
    mainScope.launch {
        val time = measureTimeMillis {
            val one = async { doSomethingUsefulOne() }
            val two = async { doSomethingUsefulTwo() }
            log("The answer is ${one.await() + two.await()}")
        }
        log("Completed in $time ms")
    }
}
3、懒加载
override fun lazyOperation()  {
    mainScope.launch {
        val time = measureTimeMillis {
            val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
            val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
            one.start() // 如果没有 则在 one.await() 开始
            log("The answer is ${one.await() + two.await()}")
        }
        log("Completed in $time ms")
    }
}

注释 :如果不通过 one.start() 开启,则会在 one.await() 时执行

4、错误异步演示
override fun asyncStyleErro() {
    log("Completed in asyncStyleErro")
    val time = measureTimeMillis { // 我们可以在协程外面启动异步执行
        val one = somethingUsefulOneAsync()
        val two = somethingUsefulTwoAsync() // 但是等待结果必须调用其它的挂起或者阻塞 // 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程
        mainScope.launch {
            log("The answer is ${one.await() + two.await()}")
        }
    }
    log("Completed in $time ms")
}

private fun somethingUsefulOneAsync() = mainScope.async {
    doSomethingUsefulOne()
}

// somethingUsefulTwoAsync 函数的返回值类型是 Deferred<Int>
private fun somethingUsefulTwoAsync() = mainScope.async {
    doSomethingUsefulTwo()
}

private suspend fun doSomethingUsefulOne(): Int {
    log("doSomethingUsefulOne")
    delay(1000L) // pretend we are doing something useful here
    return 13
}

private suspend fun doSomethingUsefulTwo(): Int {
    log("doSomethingUsefulTwo")
    delay(1000L) // pretend we are doing something useful here, too
    return 29
}

注释
这种带有异步函数的编程风格仅供参考,因为这在其它编程语言中是一种受欢迎的风格。在 Kotlin 的协程中使用这种风格是强烈不推荐的, 原因如下所述

考虑一下如果 val one = somethingUsefulOneAsync() 这一行和 one.await() 表达式这里在代码中有逻辑错误, 并且程序抛出了异常以及程序在操作的过程中中止,
将会发生什么。 通常情况下,一个全局的异常处理者会捕获这个异常,将异常打印成日记并报告给开发者,但是反之该程序将会继续执行其它操作。
但是这里我们的 somethingUsefulOneAsync 仍然在后台执行, 尽管如此,启动它的那次操作也会被终止。这个程序将不会进行结构化并发,如下一小节所示。

5、正确异步演示
override fun asyncStyleRight() {
    mainScope.launch {
        try {
            failedConcurrentSum()
        } catch (e: ArithmeticException) {
            log("Computation failed with ArithmeticException",coroutineContext)
        }
    }
}

suspend fun failedConcurrentSum(): Int = coroutineScope {
    val one = async<Int> {
        try {
            delay(Long.MAX_VALUE) // 模拟一个长时间的运算
            42
        } finally {
            log("First child was cancelled",coroutineContext)
        }
    }
    val two = async<Int> {
        log("Second child throws an exception",coroutineContext)
        throw ArithmeticException()
    }
    one.await() + two.await()
}

四. 调度器

1、调度器的介绍
override fun onDispatcherType() {
    mainScope.launch { // 运行在父协程的上下文中,即 runBlocking 主协程
        log("no param",coroutineContext)
    }

    mainScope.launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
        log("Dispatchers.Unconfined",coroutineContext)
    }

    mainScope.launch(Dispatchers.Default) { // 将会获取默认调度器
        log("Dispatchers.Default",coroutineContext)
    }

    mainScope.launch(newSingleThreadContext("SingleThread")) { // 将使它获得一个新的线程
        log("SingleThread",coroutineContext)
    }

    GlobalScope.launch {
        log("GlobalScope",coroutineContext)
    }
}

// Thread[main,5,main]-->StandaloneCoroutine{Active}@341e412-->297380853 : Dispatchers.Unconfined
// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@47975e0-->156964473 : Dispatchers.Default
// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@1d4ff3f-->112631768 : GlobalScope
// Thread[SingleThread,5,main]-->StandaloneCoroutine{Active}@730100c-->282246497 : SingleThread
// Thread[main,5,main]-->StandaloneCoroutine{Active}@fd1e36a-->494285765 : no param
2、非受限调度器 vs 受限调度器
override fun onUnconfinedVsConfined() {
    mainScope.launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
        log("Unconfined      : start", coroutineContext)
        delay(500)
        log("Unconfined      : end", coroutineContext)
    }

    mainScope.launch { // context of the parent, main runBlocking coroutine
        log("main runBlocking: start", coroutineContext)
        delay(1000)
        log("main runBlocking: end", coroutineContext)
    }
}


Thread[main,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined      : start
Thread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: start
Thread[kotlinx.coroutines.DefaultExecutor,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined      : end
Thread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: end
3、不同线程间跳转
@OptIn(DelicateCoroutinesApi::class)
override fun onJumpThread() {
    newSingleThreadContext("Ctx1").use { ctx1 ->
        newSingleThreadContext("Ctx2").use { ctx2 ->
            runBlocking(ctx1) {
                log("Started in ctx1", coroutineContext)
                withContext(ctx2) {
                    log("Working in ctx2", coroutineContext)
                }
                log("Back to ctx1", coroutineContext)
            }
        }
    }
}

Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Started in ctx1
Thread[Ctx2,5,main]-->DispatchedCoroutine{Active}@67116b4-->213567889-->null : Working in ctx2
Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Back to ctx1
4、子协成
@OptIn(DelicateCoroutinesApi::class)
override fun onChild() {
    mainScope.launch { // 启动一个协程来处理某种传入请求(request)
        val request = launch { // 孵化了两个子作业, 其中一个通过 GlobalScope 启动
            GlobalScope.launch {
                log("job1: I run in GlobalScope and execute independently!",coroutineContext)
                delay(1000)
                log("job1: I am not affected by cancellation of the request",coroutineContext)
            } // 另一个则承袭了父协程的上下文
            launch {
                delay(100)
                log("job2: I am a child of the request coroutine",coroutineContext)
                delay(1000)
                log("job2: I will not execute this line if my parent request is cancelled",coroutineContext)
            }
        }
        delay(500)
        request.cancel() // 取消请求(request)的执行
        delay(1000) // 延迟一秒钟来看看发生了什么
        log("main: Who has survived request cancellation?",coroutineContext)
    }
}

Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I run in GlobalScope and execute independently!
Thread[main,5,main]-->StandaloneCoroutine{Active}@883307f-->204922311-->null : job2: I am a child of the request coroutine
Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I am not affected by cancellation of the request
Thread[main,5,main]-->StandaloneCoroutine{Active}@9dc884c-->227554708-->null : main: Who has survived request cancellation?
5、协成的命名和log
override fun onNamed() {
    mainScope.launch {
        log("Started main coroutine",coroutineContext) // 运行两个后台值计算

        val v1 = async(CoroutineName("v1coroutine")) {
            delay(500)
            log("Computing v1",coroutineContext)
            252
        }

        val v2 = async(CoroutineName("v2coroutine")) {
            delay(1000)
            log("Computing v2",coroutineContext)
            6
        }

        launch(Dispatchers.Default + CoroutineName("test")) {
            log("I'm working in thread ${Thread.currentThread().name}",coroutineContext)
        }

        log("The answer for v1 / v2 = ${v1.await() / v2.await()}",coroutineContext)
    }
}

fun log(msg: String?, coroutine: CoroutineContext? = null) {
    val message = msg ?: "null"
    if (coroutine == null) {
        Log.e("~~~~", "${Thread.currentThread()} : $message")
    } else {
        val tag = "${Thread.currentThread()}-->${coroutine[Job]}-->${coroutine.hashCode()}-->${coroutine[CoroutineName.Key]} :"
        Log.e("~~~~", "$tag $message")
    }
}

五. Asynchronous Flow(异步流)

1、Flow 的介绍
fun onIntroduce(mainScope: CoroutineScope) = mainScope.launch {
    log("I'm the parent Coroutine")
    launch {
        for (k in 1..3) {
            log("I'm not blocked $k", coroutineContext)
            delay(2000)
        }
    }
    simple().collect { value -> println(value) }

    log("I'm the parent Coroutine end")
}

private fun simple(): Flow<Int> = flow { // 流构建器
    for (i in 1..3) {
        delay(2000) // 假装我们在这里做了一些有用的事情
        log("I'm not emit $i", coroutineContext)
        emit(i) // 发送下一个值
    }
}

Thread[main,5,main] : I'm the parent Coroutine
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 1
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 1
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 3
Thread[main,5,main] : I'm the parent Coroutine end
2、Flows are cold

flow does not run until the flow is collected

fun onFlowCold(mainScope: CoroutineScope) = mainScope.launch {
    log("Calling simple function...",coroutineContext)
    val flow = simple()
    log("Calling collect...",coroutineContext)
    flow.collect { value -> log(value,coroutineContext) }
    log("Calling collect again...",coroutineContext)
    flow.collect { value -> log(value,coroutineContext)}
}

fun simple(): Flow<Int> = flow {
    log("Flow started",coroutineContext)
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}
3、Flows withTimeout and onTimeoutOrNull
 fun onFlowTimeout(mainScope: CoroutineScope) = mainScope.launch {
        try {
            withTimeout(250) { // 在 250 毫秒后超时
                simple().collect { value -> log("$value") }
            }
        } catch (e: Exception) {
            log("Exception : $e")
        }
        log("Done")
    }

    private fun simple(): Flow<Int> = flow { // 流构建器
        for (i in 1..3) {
            log("simpleFlow", coroutineContext)
            delay(100) // 假装我们在这里做了一些有用的事情
            emit(i) // 发送下一个值
        }
    }

    fun onTimeoutOrNull(mainScope: CoroutineScope) = mainScope.launch {
        withTimeoutOrNull(250) { // 在 250 毫秒后超时
            simple().collect { value -> log("$value") }
        }
        log("Done")

    }
4、Flow operators
  • map : 转化器 可以接受 suspend 方法 以及 操作
  • transform : 转化器 接受 FlowCollector
  • filter :过滤器
  • take : 截取长度
  • toList : 转换成 List
  • toSet : 转化为 Set
  • first : 获取第一个数据
  • reduce and fold 的区别 :fold 有初始值
fun onTransform(mainScope: CoroutineScope) = mainScope.launch {
    (1..3).asFlow() // a flow of requests
        .map {
            performRequest(it)
        }
        .transform {
            emit(it)
            emit("Hello emit $it")
        }
        .filter { it.length > 2 }
        .take(5)
        .collect { response -> log(response) }

    for (item in (1..3).asFlow().toList()){
        log("toList $item")
    }

    for (item in (1..3).asFlow().toSet()){
        log("toSet $item")
    }

    log((1..3).asFlow().first())
}

fun onReduce(mainScope: CoroutineScope) = mainScope.launch{
    val sum = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .reduce { a, b -> a + b } // sum them (terminal operator)
    log(sum)
    val sum1 = (1..5).asFlow()
        .map { it * it } // squares of numbers from 1 to 5
        .fold(100) { a, b -> a + b } // sum them (terminal operator)
    log(sum1)
}

suspend fun performRequest(request: Int): String {
    delay(1000) // imitate long-running asynchronous work
    return "response $request"
}
5、Flows are sequential
fun onSequential(mainScope: CoroutineScope) = mainScope.launch {
    (1..5).asFlow()
        .filter {
            log("Filter $it")
            it % 2 == 0
        }
        .map {
            log("Map $it")
            "string $it"
        }.collect {
            log("Collect $it")
        }
}
6、Flow context Thread

Since simple().collect is called from the main thread, the body of simple's flow is also called in the main thread

fun onThread(mainScope: CoroutineScope) = mainScope.launch {
    simple().collect { value -> log("${Thread.currentThread()} :Collected $value") }
}

private fun simple(): Flow<Int> = flow {
    log("${Thread.currentThread()}:Started simple flow")
    for (i in 1..3) {
        emit(i)
    }
}
7、Flow 的线程切换

However, the long-running CPU-consuming code might need to be executed in the context of Dispatchers.Default and UI-updating code might need to be executed in the context of Dispatchers.Main

Flow 通过 withContext 会导致异常,应该通过 flowOn 进行切换

private fun onThreadSwitchErro(mainScope: CoroutineScope) = mainScope.launch {
    simpleErro().collect { value -> log("collect $value", coroutineContext) }
}

private fun simpleErro(): Flow<Int> =
    flow { // The WRONG way to change context for CPU-consuming code in flow builder
        try {
            kotlinx.coroutines.withContext(Dispatchers.Default) {
                for (i in 1..3) {
                    delay(1000)
                    Thread.sleep(1000) // pretend we are computing it in CPU-consuming way
                    log("emit $i", coroutineContext)
                    emit(i) // emit next value
                }
            }
        } catch (e: Exception) {
            log("Exception $e")
        }
    }

private fun simple(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(1000)
        Thread.sleep(1000) // pretend we are computing it in CPU-consuming way
        log("emit $i", coroutineContext)
        emit(i) // emit next value
    }
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder

private fun onThreadFlowOn(mainScope: CoroutineScope) = mainScope.launch {
    simple().collect { value ->
        log("collect $value", coroutineContext)
    }
}
8、Buffering

Buffer :通过 buffer 获取数据
*** conflate*** : 获取最新的数据
*** collectLatest***:获取最后的数据

fun onBuffer(mainScope: CoroutineScope) = mainScope.launch {
    val time = measureTimeMillis {
        simple().collect { value ->
            delay(300) // pretend we are processing it for 300 ms
            log("Normal $value")
        }
    }
    log("Collected in $time ms Normal",coroutineContext)

    val time1 = measureTimeMillis {
        simple().buffer(3).collect { value ->
            delay(300) // pretend we are processing it for 300 ms
            log("buffer $value",coroutineContext)
        }
    }
    log("Collected in $time1 ms buffer",coroutineContext)

    val time2 = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collect { value ->
                delay(300) // pretend we are processing it for 300 ms
                log("conflate $value",coroutineContext)
            }
    }
    log("Collected in $time2 ms ",coroutineContext)

    val time3 = measureTimeMillis {
        simple()
            .conflate() // conflate emissions, don't process each one
            .collectLatest { value ->
                log("collectLatest start $value",coroutineContext)
                delay(300) // pretend we are processing it for 300 ms
                log("collectLatest end $value",coroutineContext)
            }
    }
    log("Collected in $time3 ms ",coroutineContext)
}

private fun simple(): Flow<Int> = flow {
    for (i in 1..6) {
        log("emit: $i")
        delay(100) // pretend we are asynchronously waiting 100 ms
        emit(i) // emit next value
    }
}

Thread[main,5,main] : flowItem.index 10
Thread[main,5,main] : emit: 1
Thread[main,5,main] : Normal 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : Normal 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : Normal 3
Thread[main,5,main] : emit: 4
Thread[main,5,main] : Normal 4
Thread[main,5,main] : emit: 5
Thread[main,5,main] : Normal 5
Thread[main,5,main] : emit: 6
Thread[main,5,main] : Normal 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 2442 ms Normal

Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 1
Thread[main,5,main] : emit: 5
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1960 ms buffer

Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 1
Thread[main,5,main] : emit: 5
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1034 ms

Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 1
Thread[main,5,main] : emit: 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 2
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 3
Thread[main,5,main] : emit: 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 4
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest end 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 960 ms
9、Composing multiple flows : zip vs Combine
fun onCompose(mainScope: CoroutineScope) = mainScope.launch {
    val num = (1..3).asFlow() // numbers 1..3
    val str = flowOf("one", "two", "three","four") // strings
    num.zip(str) { a, b -> "$a -> $b" } // compose a single string
        .collect { log("zip : $it") } // collect and print

    val num1 = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
    val str1 = flowOf("one", "two", "three","four").onEach { delay(400) } // strings every 400 ms
    val startTime = System.currentTimeMillis() // remember the start time

    num1.zip(str1) { a, b -> "$a -> $b" } // compose a single string with "zip"
        .collect { value -> // collect and print
            log("zip delay : $value at ${System.currentTimeMillis() - startTime} ms from start")
        }

    num1.combine(str1) { a, b -> "$a -> $b" } // compose a single string with "combine"
        .collect { value -> // collect and print
            log("combine delay : $value at ${System.currentTimeMillis() - startTime} ms from start")
        }
}

Thread[main,5,main] : zip : 1 -> one
Thread[main,5,main] : zip : 2 -> two
Thread[main,5,main] : zip : 3 -> three
Thread[main,5,main] : zip delay : 1 -> one at 406 ms from start
Thread[main,5,main] : zip delay : 2 -> two at 806 ms from start
Thread[main,5,main] : zip delay : 3 -> three at 1207 ms from start
Thread[main,5,main] : combine delay : 1 -> one at 1617 ms from start
Thread[main,5,main] : combine delay : 2 -> one at 1821 ms from start
Thread[main,5,main] : combine delay : 2 -> two at 2020 ms from start
Thread[main,5,main] : combine delay : 3 -> two at 2125 ms from start
Thread[main,5,main] : combine delay : 3 -> three at 2423 ms from start
Thread[main,5,main] : combine delay : 3 -> four at 2827 ms from start
10、Flattening flows
  • flatMapConcat 顺序展平流
  • flatMapMerge 尽快的展平流
  • flatMapLatest 获取最新的数据
@OptIn(FlowPreview::class)
fun onFlatMapConcat(mainScope: CoroutineScope) = mainScope.launch {
    var startTime = System.currentTimeMillis() // remember the start time
    var useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
            log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }
     startTime = System.currentTimeMillis() // remember the start time
     useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(600) } // emit a number every 100 ms
        .flatMapConcat { requestFlow(it) }
        .collect { value -> // collect and print
            log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }
}

Thread[main,5,main] : start : 1: First: 110 : 110
Thread[main,5,main] : start : 1: Second: 612 : 501
Thread[main,5,main] : start : 1: Third: 1114 : 501
Thread[main,5,main] : start : 2: First: 1217 : 102
Thread[main,5,main] : start : 2: Second: 1718 : 501
Thread[main,5,main] : start : 2: Third: 2221 : 502
Thread[main,5,main] : start : 3: First: 2328 : 106
Thread[main,5,main] : start : 3: Second: 2831 : 503
Thread[main,5,main] : start : 3: Third: 3334 : 502

Thread[main,5,main] : end : 1: First: 607 : 607
Thread[main,5,main] : end : 1: Second: 1110 : 502
Thread[main,5,main] : end : 1: Third: 1615 : 504
Thread[main,5,main] : end : 2: First: 2220 : 605
Thread[main,5,main] : end : 2: Second: 2723 : 502
Thread[main,5,main] : end : 2: Third: 3226 : 502
Thread[main,5,main] : end : 3: First: 3832 : 605
Thread[main,5,main] : end : 3: Second: 4335 : 502
Thread[main,5,main] : end : 3: Third: 4838 : 502
@OptIn(FlowPreview::class)
fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch {
    var startTime = System.currentTimeMillis() // remember the start time
    var useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }

     startTime = System.currentTimeMillis() // remember the start time
     useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }
}


Thread[main,5,main] : start : 1: First: 814 : 814
Thread[main,5,main] : start : 1: Second: 1316 : 501
Thread[main,5,main] : start : 2: First: 1615 : 299
Thread[main,5,main] : start : 1: Third: 1816 : 201
Thread[main,5,main] : start : 2: Second: 2117 : 301
Thread[main,5,main] : start : 3: First: 2416 : 299
Thread[main,5,main] : start : 2: Third: 2623 : 206
Thread[main,5,main] : start : 3: Second: 2921 : 297
Thread[main,5,main] : start : 3: Third: 3424 : 502

Thread[main,5,main] : end : 1: First: 116 : 116
Thread[main,5,main] : end : 2: First: 221 : 104
Thread[main,5,main] : end : 3: First: 326 : 104
Thread[main,5,main] : end : 1: Second: 620 : 293
Thread[main,5,main] : end : 2: Second: 725 : 104
Thread[main,5,main] : end : 3: Second: 829 : 104
Thread[main,5,main] : end : 1: Third: 1124 : 294
Thread[main,5,main] : end : 2: Third: 1229 : 105
Thread[main,5,main] : end : 3: Third: 1333 : 104
@OptIn(FlowPreview::class)
fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch {
    var startTime = System.currentTimeMillis() // remember the start time
    var useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }
     startTime = System.currentTimeMillis() // remember the start time
     useTime = System.currentTimeMillis()
    (1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
        .flatMapMerge { requestFlow(it) }
        .collect { value -> // collect and print
            log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
            useTime = System.currentTimeMillis()
        }
}


Thread[main,5,main] : start : 1: First: 804 : 804
Thread[main,5,main] : start : 1: Second: 1305 : 501
Thread[main,5,main] : start : 2: First: 1607 : 301
Thread[main,5,main] : start : 2: Second: 2111 : 504
Thread[main,5,main] : start : 3: First: 2421 : 309
Thread[main,5,main] : start : 3: Second: 2923 : 501
Thread[main,5,main] : start : 3: Third: 3427 : 504
Thread[main,5,main] : end : 1: First: 111 : 111
Thread[main,5,main] : end : 2: First: 224 : 113
Thread[main,5,main] : end : 3: First: 336 : 111
Thread[main,5,main] : end : 3: Second: 838 : 501
Thread[main,5,main] : end : 3: Third: 1342 : 504
11、Declarative handling
  • try catch exception 捕获异常
  • catch 捕获异常
  • onCompletion 表示完成,能展示异常但是不会处理
fun onDeclarative(mainScope: CoroutineScope) = mainScope.launch {
    try {
        simple().collect { value -> log("try : $value") }
    } catch (e:Exception){
        log("try : $e")
    }finally {
        log("try : Done")
    }

    simple()
        .onCompletion { cause ->log("Flow completed exceptionally $cause") }
        .catch { cause -> log("Caught exception: $cause") }.collect { value -> log("catch $value") }

    (1..4).asFlow()
        .onCompletion { cause -> log("asFlow Flow completed with $cause") }
        .catch { cause -> log("asFlow Caught exception: $cause") }
        .collect { value ->
            log("asFlow $value")
        }
}

private fun simple(): Flow<Int> = flow {
    emit(1)
    throw RuntimeException()
}


Thread[main,5,main] : flowItem.index 15
Thread[main,5,main] : try : 1
Thread[main,5,main] : try : java.lang.RuntimeException
Thread[main,5,main] : try : Done
Thread[main,5,main] : catch 1
Thread[main,5,main] : Flow completed exceptionally java.lang.RuntimeException
Thread[main,5,main] : Caught exception: java.lang.RuntimeException
Thread[main,5,main] : asFlow 1
Thread[main,5,main] : asFlow 2
Thread[main,5,main] : asFlow 3
Thread[main,5,main] : asFlow 4
Thread[main,5,main] : asFlow Flow completed with null
12、FLaunching flow
  • collect :同步
  • launchIn :异步
fun onCollect(mainScope: CoroutineScope) = mainScope.launch {
    events()
        .onEach { event -> log("Collect Event: $event") }
        .collect() // <--- Collecting the flow waits
    log("Collect Done")
}

private fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

Thread[main,5,main] : Collect Event: 1
Thread[main,5,main] : Collect Event: 2
Thread[main,5,main] : Collect Event: 3
Thread[main,5,main] : Collect Done
private fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }

fun onLaunchIn(mainScope: CoroutineScope) = mainScope.launch {
    events()
        .onEach { event -> log("LaunchIn Event: $event") }
        .launchIn(mainScope) // <--- Collecting the flow waits
    log("LaunchIn Done")
}

Thread[main,5,main] : LaunchIn Done
Thread[main,5,main] : LaunchIn Event: 1
Thread[main,5,main] : LaunchIn Event: 2
Thread[main,5,main] : LaunchIn Event: 3
13、Flow cancellation checks
  • asFlow:因为 繁忙的数据流无法取消
  • flow {}:因为 asFlow 默认使用 ensureActive检查所以可以取消
  • cancellable:通过 cancellable 是流可以被取消
  • currentCoroutineContext().ensureActive():通过 currentCoroutineContext().ensureActive() 是流可以被取消
/**
 * 因为 繁忙的数据流无法取消
 */
fun onAsFlow(mainScope: CoroutineScope) = mainScope.launch {
    (1..5).asFlow().collect { value ->
        if (value == 3) cancel()
        log("asFlow : $value")
    }
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3
// Thread[main,5,main] : asFlow : 4
// Thread[main,5,main] : asFlow : 5


/**
 * 因为 asFlow 默认使用 ensureActive检查所以可以取消
 */
fun onFlow(mainScope: CoroutineScope) = mainScope.launch {
    foo().collect { value ->
        if (value == 3) cancel()
        log("cancel : $value")
    }
}
// Thread[main,5,main] : Emitting 1
// Thread[main,5,main] : cancel : 1
// Thread[main,5,main] : Emitting 2
// Thread[main,5,main] : cancel : 2
// Thread[main,5,main] : Emitting 3
// Thread[main,5,main] : cancel : 3
// Thread[main,5,main] : Emitting 4


/**
 * 通过 cancellable 是流可以被取消
 */
fun onCancel(mainScope: CoroutineScope) = mainScope.launch {
    (1..5).asFlow().cancellable().collect { value ->
        if (value == 3) cancel()
        log("asFlow : $value")
    }
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3


/**
 * 通过 currentCoroutineContext().ensureActive() 是流可以被取消
 */
fun onEnsureActive(mainScope: CoroutineScope) = mainScope.launch {
    (1..5).asFlow().onEach { currentCoroutineContext().ensureActive() }.collect { value ->
        if (value == 3) cancel()
        log("asFlow : $value")
    }
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3


private fun foo(): Flow<Int> = flow {
    for (i in 1..5) {
        log("Emitting $i")
        emit(i)
    }
}

六. Channels

1、Building channel producers
@OptIn(ExperimentalCoroutinesApi::class)
private fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
    for (x in 1..5) {
        delay(300)
        send(x * x)
    }
}

fun onProducer(mainScope: CoroutineScope) = mainScope.launch {
    val squares = produceSquares()
    squares.consumeEach { log(it) }
    println("Done!")
}
2、Pipelines
private fun CoroutineScope.produceNumbers() = produce<Int> {
    var x = 1
    while (true) send(x++) // 在流中开始从 1 生产无穷多个整数
}

private fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
    for (x in numbers) send(x * x * x)
}

fun onPipeline(mainScope: CoroutineScope) = mainScope.launch {
    val numbers = produceNumbers() // 从 1 开始生成整数
    val squares = square(numbers) // 整数求平方
    repeat(5) {
        log(squares.receive()) // 输出前五个
    }
    log("Done!") // 至此已完成
    coroutineContext.cancelChildren() // 取消子协程
}
3、Fan-out
@OptIn(ExperimentalCoroutinesApi::class)
private fun CoroutineScope.produceNumbers() = produce {
    var x = 1 // start from 1
    while (true) {
        send(x++) // produce next
        delay(100) // wait 0.1s
    }
}

private fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
    for (msg in channel) {
        log("Processor #$id received $msg",coroutineContext)
    }
}

fun fanOut(mainScope:CoroutineScope) = mainScope.launch {
    val producer = produceNumbers()
    repeat(5) { launchProcessor(it, producer) }
    delay(950)
    producer.cancel() // cancel producer coroutine and thus kill them all
}
4、Fan-in
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.numberProducer(start: Int) = produce<Int> {
    var index = start
    while (true) send(index++)
}

@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
    for (x in numbers) if (x % prime != 0) send(x)
}

fun findPrime(mainScope: CoroutineScope) = mainScope.launch {
    var cur = numberProducer(2)
    repeat(10) {
        val prime = cur.receive()
        log(prime)
        cur = filter(cur, prime)
    }
    coroutineContext.cancelChildren()
}
4、Buffered channels
fun sendWithBuffer(mainScope:CoroutineScope) = mainScope.launch {
    val channel = Channel<Int>(4) // create buffered channel
    val sender = launch { // launch sender coroutine
        repeat(10) {
            log("Four Sending $it") // print before sending each element
            channel.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender.cancel() // cancel sender coroutine

    val channel1 = Channel<Int>(2) // create buffered channel
    val sender1 = launch { // launch sender coroutine
        repeat(10) {
            log("Two Sending $it") // print before sending each element
            channel1.send(it) // will suspend when buffer is full
        }
    }
    // don't receive anything... just wait....
    delay(1000)
    sender1.cancel() // cancel sender coroutine
}

Thread[main,5,main] : Four Sending 0
Thread[main,5,main] : Four Sending 1
Thread[main,5,main] : Four Sending 2
Thread[main,5,main] : Four Sending 3
Thread[main,5,main] : Four Sending 4
Thread[main,5,main] : Two Sending 0
Thread[main,5,main] : Two Sending 1
Thread[main,5,main] : Two Sending 2
5、Channels are fair

发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则

data class Ball(var hits: Int)

fun showChannelFair(mainScope:CoroutineScope) = mainScope.launch {
    val table = Channel<Ball>() // a shared table
    launch { player("ping", table) }
    launch { player("pong", table) }
    table.send(Ball(0)) // serve the ball
    delay(1000) // delay 1 second
    coroutineContext.cancelChildren() // game over, cancel them
}

suspend fun player(name: String, table: Channel<Ball>) {
    for (ball in table) { // receive the ball in a loop
        ball.hits++
        log("$name $ball")
        delay(300) // wait a bit
        table.send(ball) // send the ball back
    }
}

// Thread[main,5,main] : ping Ball(hits=1)
// Thread[main,5,main] : pong Ball(hits=2)
// Thread[main,5,main] : ping Ball(hits=3)
// Thread[main,5,main] : pong Ball(hits=4)
6、Ticker channels

计时器通道是一种特别的会合通道,每次经过特定的延迟都会从该通道进行消费并产生 Unit

fun tickerSend(mainScope:CoroutineScope) = mainScope.launch {
    val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道

    var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    log("Initial element is available immediately: $nextElement") // no initial delay

    nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
    log("Next element is not ready in 50 ms: $nextElement")

    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    log("Next element is ready in 100 ms: $nextElement")

    // 模拟大量消费延迟
    log("Consumer pauses for 150ms")
    delay(150)

    // 下一个元素立即可用
    nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
    log("Next element is available immediately after large consumer delay: $nextElement")

    // 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
    nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
    log("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")

    tickerChannel.cancel() // 表明不再需要更多的元素
}

// Thread[main,5,main] : Initial element is available immediately: kotlin.Unit
// Thread[main,5,main] : Next element is not ready in 50 ms: null
// Thread[main,5,main] : Next element is ready in 100 ms: kotlin.Unit
// Thread[main,5,main] : Consumer pauses for 150ms
// Thread[main,5,main] : Next element is available immediately after large consumer delay: kotlin.Unit
// Thread[main,5,main] : Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit

七. Coroutine exceptions handling

1、Exception propagation
  • propagating exceptions automatically (launch and actor) 自动传播异常
  • exposing them to users (async and produce) 向用户暴露异常
@OptIn(DelicateCoroutinesApi::class)
fun propagationException(mainScope:CoroutineScope) = mainScope.launch {
    val deferred = GlobalScope.async { // root coroutine with async
        log("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }
    try {
        deferred.await()
        log("Unreached")
    } catch (e: ArithmeticException) {
        log("Caught ArithmeticException")
    }
    val job = try {
       GlobalScope.launch { // root coroutine with launch
            log("Throwing exception from launch")
            throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
        }
    }catch(e:Exception) {
        log("Throwing exception from launch")
        null
    }
    job?.join()
    log("Joined failed job")
}



Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from async
Thread[main,5,main] : Caught ArithmeticException
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launch
2022-10-28 15:12:21.144 24517-24585/com.zhihaoliang.coroutine E/AndroidRuntime: FATAL EXCEPTION: DefaultDispatcher-worker-1
    Process: com.zhihaoliang.coroutine, PID: 24517
    java.lang.IndexOutOfBoundsException
        at com.zhihaoliang.coroutine.buss.excep.func.ExceptionPropagation$propagationException$1$job$1.invokeSuspend(ExceptionPropagation.kt:29)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
        Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@65b7906, Dispatchers.Default]
2、CoroutineExceptionHandler
 private val handler = CoroutineExceptionHandler { _, exception ->
     log("CoroutineExceptionHandler got $exception")
 }

fun handlerException(mainScope:CoroutineScope) = mainScope.launch {
    val deferred = GlobalScope.async(handler) { // root coroutine with async
        log("Throwing exception from async")
        throw ArithmeticException() // Nothing is printed, relying on user to call await
    }

    try {
        deferred.await()
        log("Unreached")
    } catch (e: ArithmeticException) {
        log("Caught ArithmeticException")
    }

    val job = try {
        GlobalScope.launch(handler) { // root coroutine with launch
            log("Throwing exception from launch")
            throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
        }
    }catch(e:Exception) {
        log("Throwing exception from launch")
        null
    }
    
    job?.join()
    log("Joined failed job")
}


Thread[main,5,main] : com.zhihaoliang.coroutine.buss.excep.ExceptionActivity
Thread[main,5,main] : exceptionItem.index 1
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from async
Thread[main,5,main] : Caught ArithmeticException
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launch
Thread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.IndexOutOfBoundsException
Thread[main,5,main] : Joined failed job
3、Cancellation and exceptions

CancellationException are ignored by all handlers

fun cancelExceptionNormal(mainScope:CoroutineScope) = mainScope.launch {
    val job = launch {
        val child = launch {
            try {
                delay(Long.MAX_VALUE)
            } finally {
                log("Child is cancelled")
            }
        }
        yield()
        log("Cancelling child")
        child.cancel()
        child.join()
        yield()
        log("Parent is not cancelled")
    }
    job.join()
}


// Thread[main,5,main] : Cancelling child
// Thread[main,5,main] : Child is cancelled
// Thread[main,5,main] : Parent is not cancelled


@OptIn(DelicateCoroutinesApi::class)
fun cancelExceptionHander(mainScope:CoroutineScope) = mainScope.launch {
    val handler = CoroutineExceptionHandler { _, exception ->
        log("CoroutineExceptionHandler got $exception")
    }
    val job = GlobalScope.launch(handler) {
        launch { // the first child
            try {
                delay(Long.MAX_VALUE)
            } finally {
                withContext(NonCancellable) {
                    log("Children are cancelled, but exception is not handled until all children terminate")
                    delay(100)
                    log("The first child finished its non cancellable block")
                }
            }
        }
        launch { // the second child
            delay(10)
            log("Second child throws an exception")
            throw ArithmeticException()
        }
    }
    job.join()
}

// Thread[DefaultDispatcher-worker-2,5,main] : Second child throws an exception
// Thread[DefaultDispatcher-worker-1,5,main] : Children are cancelled, but exception is not handled until all children terminate
// Thread[DefaultDispatcher-worker-1,5,main] : The first child finished its non cancellable block
// Thread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.ArithmeticException
4、Exceptions aggregation

当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常

@OptIn(DelicateCoroutinesApi::class)
fun exceptionAggregationNormal(mainScope: CoroutineScope) = mainScope.launch {
    val handler = CoroutineExceptionHandler { _, exception ->
        log(
            "CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        launch {
            try {
                delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException  失败时,它将被取消
            } finally {
                throw ArithmeticException() // 第二个异常
            }
        }
        launch {
            delay(100)
            throw IOException() // 首个异常
        }
        delay(Long.MAX_VALUE)
    }
    job.join()
}

// Thread[DefaultDispatcher-worker-3,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]

取消异常是透明的,默认情况下是未包装的:handler未捕获到

@OptIn(DelicateCoroutinesApi::class)
fun exceptionAggregationCancel(mainScope: CoroutineScope) = mainScope.launch {
    val handler = CoroutineExceptionHandler { _, exception ->
        log(
            "CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
    }
    val job = GlobalScope.launch(handler) {
        val inner = launch { // 该栈内的协程都将被取消
            launch {
                launch {
                    throw IOException() // 原始异常
                }
            }
        }
        try {
            inner.join()
        } catch (e: CancellationException) {
            log("Rethrowing CancellationException with original cause")
            throw e // 取消异常被重新抛出,但原始 IOException 得到了处理
        }
    }
    job.join()
}

// Thread[DefaultDispatcher-worker-3,5,main] : Rethrowing CancellationException with original cause
// Thread[DefaultDispatcher-worker-2,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed []
5、Supervision job
fun supervisionJob(mainScope:CoroutineScope) = mainScope.launch {
    val supervisor = SupervisorJob()
    with(CoroutineScope(coroutineContext + supervisor)) {
        // launch the first child -- its exception is ignored for this example (don't do this in practice!)
        val firstChild = launch(CoroutineExceptionHandler { _, _ ->  }) {
            log("The first child is failing")
            throw AssertionError("The first child is cancelled")
        }
        // launch the second child
        val secondChild = launch {
            firstChild.join()
            // Cancellation of the first child is not propagated to the second child
            log("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
            try {
                delay(Long.MAX_VALUE)
            } finally {
                // But cancellation of the supervisor is propagated
                log("The second child is cancelled because the supervisor was cancelled")
            }
        }
        // wait until the first child fails & completes
        firstChild.join()
        log("Cancelling the supervisor")
        supervisor.cancel()
        secondChild.join()
    }
}



// Thread[main,5,main] : The first child is failing
// Thread[main,5,main] : The first child is cancelled: true, but the second one is still active
// Thread[main,5,main] : Cancelling the supervisor
// Thread[main,5,main] : The second child is cancelled because the supervisor was cancelled
6、Supervision scop
fun supervisionScope(mainScope:CoroutineScope) = mainScope.launch {
    try {
        supervisorScope {
            val child = launch {
                try {
                    log("The child is sleeping")
                    delay(Long.MAX_VALUE)
                } finally {
                    log("The child is cancelled")
                }
            }
            // Give our child a chance to execute and print using yield
            yield()
            log("Throwing an exception from the scope")
            throw AssertionError()
        }
    } catch(e: AssertionError) {
        log("Caught an assertion error")
    }
}

// Thread[main,5,main] : The child is sleeping
// Thread[main,5,main] : Throwing an exception from the scope
// Thread[main,5,main] : The child is cancelled
// Thread[main,5,main] : Caught an assertion error

八.Shared mutable state and concurrency

suspend fun massiveRun(action: suspend () -> Unit) {
    val n = 100  // number of coroutines to launch
    val k = 1000 // times an action is repeated by each coroutine
    val time = measureTimeMillis {
        coroutineScope { // scope for coroutines
            repeat(n) {
                launch {
                    repeat(k) { action() }
                }
            }
        }
    }
    log("Completed ${n * k} actions in $time ms")
}
1、The problem
fun showErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) {
    var counter = 0
    massiveRun {
        counter++
    }
    log("Counter = $counter")
}


// Thread[DefaultDispatcher-worker-8,5,main] : Completed 100000 actions in 21 ms
// Thread[DefaultDispatcher-worker-8,5,main] : Counter = 45004
2、Volatiles are of no help
@Volatile // in Kotlin `volatile` is an annotation
var counterVolatile = 0
fun showVolatilesErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) {
     counterVolatile = 0
    massiveRun {
        counterVolatile++
    }
    log("counterVolatile = $counterVolatile")
}


// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 77 ms
// Thread[DefaultDispatcher-worker-7,5,main] : counterVolatile = 88777
3、Thread-safe data structures
// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 34 ms
// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 100000
4、Thread confinement coarse-grained
val counterContext = newSingleThreadContext("CounterContext")
private fun runSingleThread(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
    var counter = 0
    withContext(counterContext) {
        massiveRun {
            counter++
        }
    }
    log("Counter = $counter")
}

// Thread[CounterContext,5,main] : Completed 100000 actions in 65 ms
// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 100000
5、Mutual exclusion
val mutex = Mutex()
private fun useMutual(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
    var counter = 0
    massiveRun {
        // protect each increment with lock
        mutex.withLock {
            counter++
        }
    }
    log("Counter = $counter")
}

// Thread[DefaultDispatcher-worker-2,5,main] : Completed 100000 actions in 1547 ms
// Thread[DefaultDispatcher-worker-2,5,main] : Counter = 100000
6、Actors
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求

// 这个函数启动一个新的计数器 actor
private fun CoroutineScope.counterActor() = actor<CounterMsg> {
    var counter = 0 // actor 状态
    for (msg in channel) { // 即将到来消息的迭代器
        when (msg) {
            is IncCounter -> counter++
            is GetCounter -> msg.response.complete(counter)
        }
    }
}

private fun useActor(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
    val counter = counterActor() // 创建该 actor
    withContext(Dispatchers.Default) {
        massiveRun {
            counter.send(IncCounter)
        }
    }
    // 发送一条消息以用来从一个 actor 中获取计数值
    val response = CompletableDeferred<Int>()
    counter.send(GetCounter(response))
    log("Counter = ${response.await()}")
    counter.close() // 关闭该actor
}


// Thread[DefaultDispatcher-worker-4,5,main] : Completed 100000 actions in 907 ms
// Thread[DefaultDispatcher-worker-4,5,main] : Counter = 100000

代码地址:https://gitee.com/trainAndroid/coroutine#1exception-propagation