获取 github 贡献者的列表
/**
* github 账号的设置
* 登录账号、token 、组织
* 获取token的地址 https://github.com/settings/tokens/new
*/
private val req = RequestData("zhi*******@163.com","***************", "kotlin")
fun loadContributorsBlocking(): List<User> {
val repos = service.getOrgReposCall(req.org).execute().body() ?: emptyList()
return repos.flatMap {
service.getRepoContributorsCall(req.org, it.name).execute().body() ?: emptyList()
}
}
fun loadContributorsCallbacks(userLoadData: IUserLoadData) {
service.getOrgReposCall(req.org).enqueue(NetCallback {
val listRepos: List<Repo> = it ?: emptyList()
if (listRepos.isEmpty()) userLoadData.onUserLoad(emptyList())
val allUsers: MutableList<User> = ArrayList()
val numberOfProcessed = AtomicInteger() //多线程的同步处理
//val countDownLatch = CountDownLatch(listRepos.size)
for (repo in listRepos) {
service.getRepoContributorsCall(req.org, repo.name).enqueue(NetCallback { list ->
if (!list.isNullOrEmpty()) allUsers.addAll(list)
if (numberOfProcessed.incrementAndGet() == listRepos.size) {
userLoadData.onUserLoad(allUsers.aggregate())
}
})
}
//countDownLatch.await() //导致线程阻塞
// userLoadData.onUserLoad(allUsers.aggregate())
})
}
suspend fun loadContributorsSuspend(userLoadData: IUserLoadData) {
val repos = service.getOrgRepos(req.org).body() ?: emptyList()
val users = repos.flatMap { repo ->
service.getRepoContributors(req.org, repo.name).body() ?: emptyList()
}.aggregate()
userLoadData.onUserLoad(users)
}
suspend fun loadContributorsConcurrent(userLoadData: IUserLoadData) {
val repos = service.getOrgRepos(req.org).body() ?: emptyList()
val deferred = repos.map { repo ->
CoroutineScope(Dispatchers.IO).async {
log(repo.name, coroutineContext)
service.getRepoContributors(req.org, repo.name).body() ?: emptyList()
}
}
val list = deferred.awaitAll().flatten().aggregate()
userLoadData.onUserLoad(list)
}
override fun onJobCancel() = onClick {
log("main: I'm start!")
val job = mainScope.launch {
repeat(1000) { i ->
log("job: I'm sleeping $i ...")
delay(500L)
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!")
job.cancel() // cancels the job
log("main: Now I can quit.")
}
override fun onCancelComplete() = onClick {
val startTime = System.currentTimeMillis()
val job = mainScope.launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (i < 10) { // computation loop, just wastes CPU
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
log("job: I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
log("main: Now I can quit.")
}
override fun onCancelException() = onClick {
val job = mainScope.launch {
repeat(5) { i ->
try { // print a message twice a second
log("job: I'm sleeping $i ...")
delay(500)
} catch (e: Exception) {
log("job Exception: $e")
}
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
log("main: Now I can quit.")
}
override fun onActiveCancel() = onClick {
val startTime = System.currentTimeMillis()
val job = mainScope.launch(Dispatchers.Default) { //(1)
//(2) val job = mainScope.launch {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
log("job: I'm sleeping ${i++} ...",coroutineContext)
nextPrintTime += 500L
//(3) delay(500)
}
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!", currentCoroutineContext())
job.cancelAndJoin() // cancels the job and waits for its completion
log("main: Now I can quit.", currentCoroutineContext())
}
注解
override fun onFinallyCancel() = onClick {
val job = mainScope.launch {
try {
repeat(1000) { i ->
log("job: I'm sleeping $i ...")
delay(500L)
}
} catch (e: Exception) {
log("Exception: : $e")
} finally {
log("job: I'm running finally")
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
log("main: Now I can quit.")
}
override fun onNoCancel() = onClick {
val job = mainScope.launch {
try {
repeat(1000) { i ->
log("job: I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
log("job: I'm running finally")
delay(1000L)
log("job: And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
log("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
log("main: Now I can quit.")
}
override fun onTimeOut() = onClick {
withTimeout(1300L) {
repeat(1000) { i ->
log("I'm sleeping $i ...")
delay(500L)
}
}
}
override fun onTimeOutCatch() = onClick {
try {
withTimeout(1300L) {
repeat(1000) { i ->
log("I'm sleeping $i ...")
delay(500L)
}
}
} catch (e: Exception) {
log("Exception $e")
}
}
override fun onTimeOutCatchOrNull() = onClick {
val result = withTimeoutOrNull(1300L) {
repeat(1000) { i ->
log("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
log("Result is $result")
val result1 = withTimeoutOrNull(13000L) {
repeat(5) { i ->
log("I'm sleeping $i ...")
delay(500L)
}
"Done" // will get cancelled before it produces this result
}
log("Result is $result1")
}
override fun onSequentialDefault() = onClick {
val time = measureTimeMillis {
val one = doSomethingUsefulOne()
val two = doSomethingUsefulTwo()
log("The answer is ${one + two}")
}
log("Completed in $time ms")
}
override fun onAsyncOperation() {
mainScope.launch {
val time = measureTimeMillis {
val one = async { doSomethingUsefulOne() }
val two = async { doSomethingUsefulTwo() }
log("The answer is ${one.await() + two.await()}")
}
log("Completed in $time ms")
}
}
override fun lazyOperation() {
mainScope.launch {
val time = measureTimeMillis {
val one = async(start = CoroutineStart.LAZY) { doSomethingUsefulOne() }
val two = async(start = CoroutineStart.LAZY) { doSomethingUsefulTwo() }
one.start() // 如果没有 则在 one.await() 开始
log("The answer is ${one.await() + two.await()}")
}
log("Completed in $time ms")
}
}
注释 :如果不通过 one.start() 开启,则会在 one.await() 时执行
override fun asyncStyleErro() {
log("Completed in asyncStyleErro")
val time = measureTimeMillis { // 我们可以在协程外面启动异步执行
val one = somethingUsefulOneAsync()
val two = somethingUsefulTwoAsync() // 但是等待结果必须调用其它的挂起或者阻塞 // 当我们等待结果的时候,这里我们使用 `runBlocking { …… }` 来阻塞主线程
mainScope.launch {
log("The answer is ${one.await() + two.await()}")
}
}
log("Completed in $time ms")
}
private fun somethingUsefulOneAsync() = mainScope.async {
doSomethingUsefulOne()
}
// somethingUsefulTwoAsync 函数的返回值类型是 Deferred<Int>
private fun somethingUsefulTwoAsync() = mainScope.async {
doSomethingUsefulTwo()
}
private suspend fun doSomethingUsefulOne(): Int {
log("doSomethingUsefulOne")
delay(1000L) // pretend we are doing something useful here
return 13
}
private suspend fun doSomethingUsefulTwo(): Int {
log("doSomethingUsefulTwo")
delay(1000L) // pretend we are doing something useful here, too
return 29
}
注释 :
这种带有异步函数的编程风格仅供参考,因为这在其它编程语言中是一种受欢迎的风格。在 Kotlin 的协程中使用这种风格是强烈不推荐的, 原因如下所述
考虑一下如果 val one = somethingUsefulOneAsync() 这一行和 one.await() 表达式这里在代码中有逻辑错误, 并且程序抛出了异常以及程序在操作的过程中中止,
将会发生什么。 通常情况下,一个全局的异常处理者会捕获这个异常,将异常打印成日记并报告给开发者,但是反之该程序将会继续执行其它操作。
但是这里我们的 somethingUsefulOneAsync 仍然在后台执行, 尽管如此,启动它的那次操作也会被终止。这个程序将不会进行结构化并发,如下一小节所示。
override fun asyncStyleRight() {
mainScope.launch {
try {
failedConcurrentSum()
} catch (e: ArithmeticException) {
log("Computation failed with ArithmeticException",coroutineContext)
}
}
}
suspend fun failedConcurrentSum(): Int = coroutineScope {
val one = async<Int> {
try {
delay(Long.MAX_VALUE) // 模拟一个长时间的运算
42
} finally {
log("First child was cancelled",coroutineContext)
}
}
val two = async<Int> {
log("Second child throws an exception",coroutineContext)
throw ArithmeticException()
}
one.await() + two.await()
}
override fun onDispatcherType() {
mainScope.launch { // 运行在父协程的上下文中,即 runBlocking 主协程
log("no param",coroutineContext)
}
mainScope.launch(Dispatchers.Unconfined) { // 不受限的——将工作在主线程中
log("Dispatchers.Unconfined",coroutineContext)
}
mainScope.launch(Dispatchers.Default) { // 将会获取默认调度器
log("Dispatchers.Default",coroutineContext)
}
mainScope.launch(newSingleThreadContext("SingleThread")) { // 将使它获得一个新的线程
log("SingleThread",coroutineContext)
}
GlobalScope.launch {
log("GlobalScope",coroutineContext)
}
}
// Thread[main,5,main]-->StandaloneCoroutine{Active}@341e412-->297380853 : Dispatchers.Unconfined
// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@47975e0-->156964473 : Dispatchers.Default
// Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@1d4ff3f-->112631768 : GlobalScope
// Thread[SingleThread,5,main]-->StandaloneCoroutine{Active}@730100c-->282246497 : SingleThread
// Thread[main,5,main]-->StandaloneCoroutine{Active}@fd1e36a-->494285765 : no param
override fun onUnconfinedVsConfined() {
mainScope.launch(Dispatchers.Unconfined) { // not confined -- will work with main thread
log("Unconfined : start", coroutineContext)
delay(500)
log("Unconfined : end", coroutineContext)
}
mainScope.launch { // context of the parent, main runBlocking coroutine
log("main runBlocking: start", coroutineContext)
delay(1000)
log("main runBlocking: end", coroutineContext)
}
}
Thread[main,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined : start
Thread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: start
Thread[kotlinx.coroutines.DefaultExecutor,5,main]-->StandaloneCoroutine{Active}@62f6fa8-->315783128-->null : Unconfined : end
Thread[main,5,main]-->StandaloneCoroutine{Active}@a27eac1-->232495113-->null : main runBlocking: end
@OptIn(DelicateCoroutinesApi::class)
override fun onJumpThread() {
newSingleThreadContext("Ctx1").use { ctx1 ->
newSingleThreadContext("Ctx2").use { ctx2 ->
runBlocking(ctx1) {
log("Started in ctx1", coroutineContext)
withContext(ctx2) {
log("Working in ctx2", coroutineContext)
}
log("Back to ctx1", coroutineContext)
}
}
}
}
Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Started in ctx1
Thread[Ctx2,5,main]-->DispatchedCoroutine{Active}@67116b4-->213567889-->null : Working in ctx2
Thread[Ctx1,5,main]-->BlockingCoroutine{Active}@a4ddda1-->310540647-->null : Back to ctx1
@OptIn(DelicateCoroutinesApi::class)
override fun onChild() {
mainScope.launch { // 启动一个协程来处理某种传入请求(request)
val request = launch { // 孵化了两个子作业, 其中一个通过 GlobalScope 启动
GlobalScope.launch {
log("job1: I run in GlobalScope and execute independently!",coroutineContext)
delay(1000)
log("job1: I am not affected by cancellation of the request",coroutineContext)
} // 另一个则承袭了父协程的上下文
launch {
delay(100)
log("job2: I am a child of the request coroutine",coroutineContext)
delay(1000)
log("job2: I will not execute this line if my parent request is cancelled",coroutineContext)
}
}
delay(500)
request.cancel() // 取消请求(request)的执行
delay(1000) // 延迟一秒钟来看看发生了什么
log("main: Who has survived request cancellation?",coroutineContext)
}
}
Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I run in GlobalScope and execute independently!
Thread[main,5,main]-->StandaloneCoroutine{Active}@883307f-->204922311-->null : job2: I am a child of the request coroutine
Thread[DefaultDispatcher-worker-1,5,main]-->StandaloneCoroutine{Active}@8339fd9-->205189639-->null : job1: I am not affected by cancellation of the request
Thread[main,5,main]-->StandaloneCoroutine{Active}@9dc884c-->227554708-->null : main: Who has survived request cancellation?
override fun onNamed() {
mainScope.launch {
log("Started main coroutine",coroutineContext) // 运行两个后台值计算
val v1 = async(CoroutineName("v1coroutine")) {
delay(500)
log("Computing v1",coroutineContext)
252
}
val v2 = async(CoroutineName("v2coroutine")) {
delay(1000)
log("Computing v2",coroutineContext)
6
}
launch(Dispatchers.Default + CoroutineName("test")) {
log("I'm working in thread ${Thread.currentThread().name}",coroutineContext)
}
log("The answer for v1 / v2 = ${v1.await() / v2.await()}",coroutineContext)
}
}
fun log(msg: String?, coroutine: CoroutineContext? = null) {
val message = msg ?: "null"
if (coroutine == null) {
Log.e("~~~~", "${Thread.currentThread()} : $message")
} else {
val tag = "${Thread.currentThread()}-->${coroutine[Job]}-->${coroutine.hashCode()}-->${coroutine[CoroutineName.Key]} :"
Log.e("~~~~", "$tag $message")
}
}
fun onIntroduce(mainScope: CoroutineScope) = mainScope.launch {
log("I'm the parent Coroutine")
launch {
for (k in 1..3) {
log("I'm not blocked $k", coroutineContext)
delay(2000)
}
}
simple().collect { value -> println(value) }
log("I'm the parent Coroutine end")
}
private fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
delay(2000) // 假装我们在这里做了一些有用的事情
log("I'm not emit $i", coroutineContext)
emit(i) // 发送下一个值
}
}
Thread[main,5,main] : I'm the parent Coroutine
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 1
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 1
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@f8f7-->38825051-->null : I'm not blocked 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@f249282-->292816358-->null : I'm not emit 3
Thread[main,5,main] : I'm the parent Coroutine end
flow does not run until the flow is collected
fun onFlowCold(mainScope: CoroutineScope) = mainScope.launch {
log("Calling simple function...",coroutineContext)
val flow = simple()
log("Calling collect...",coroutineContext)
flow.collect { value -> log(value,coroutineContext) }
log("Calling collect again...",coroutineContext)
flow.collect { value -> log(value,coroutineContext)}
}
fun simple(): Flow<Int> = flow {
log("Flow started",coroutineContext)
for (i in 1..3) {
delay(100)
emit(i)
}
}
fun onFlowTimeout(mainScope: CoroutineScope) = mainScope.launch {
try {
withTimeout(250) { // 在 250 毫秒后超时
simple().collect { value -> log("$value") }
}
} catch (e: Exception) {
log("Exception : $e")
}
log("Done")
}
private fun simple(): Flow<Int> = flow { // 流构建器
for (i in 1..3) {
log("simpleFlow", coroutineContext)
delay(100) // 假装我们在这里做了一些有用的事情
emit(i) // 发送下一个值
}
}
fun onTimeoutOrNull(mainScope: CoroutineScope) = mainScope.launch {
withTimeoutOrNull(250) { // 在 250 毫秒后超时
simple().collect { value -> log("$value") }
}
log("Done")
}
fun onTransform(mainScope: CoroutineScope) = mainScope.launch {
(1..3).asFlow() // a flow of requests
.map {
performRequest(it)
}
.transform {
emit(it)
emit("Hello emit $it")
}
.filter { it.length > 2 }
.take(5)
.collect { response -> log(response) }
for (item in (1..3).asFlow().toList()){
log("toList $item")
}
for (item in (1..3).asFlow().toSet()){
log("toSet $item")
}
log((1..3).asFlow().first())
}
fun onReduce(mainScope: CoroutineScope) = mainScope.launch{
val sum = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.reduce { a, b -> a + b } // sum them (terminal operator)
log(sum)
val sum1 = (1..5).asFlow()
.map { it * it } // squares of numbers from 1 to 5
.fold(100) { a, b -> a + b } // sum them (terminal operator)
log(sum1)
}
suspend fun performRequest(request: Int): String {
delay(1000) // imitate long-running asynchronous work
return "response $request"
}
fun onSequential(mainScope: CoroutineScope) = mainScope.launch {
(1..5).asFlow()
.filter {
log("Filter $it")
it % 2 == 0
}
.map {
log("Map $it")
"string $it"
}.collect {
log("Collect $it")
}
}
Since simple().collect is called from the main thread, the body of simple's flow is also called in the main thread
fun onThread(mainScope: CoroutineScope) = mainScope.launch {
simple().collect { value -> log("${Thread.currentThread()} :Collected $value") }
}
private fun simple(): Flow<Int> = flow {
log("${Thread.currentThread()}:Started simple flow")
for (i in 1..3) {
emit(i)
}
}
However, the long-running CPU-consuming code might need to be executed in the context of Dispatchers.Default and UI-updating code might need to be executed in the context of Dispatchers.Main
Flow 通过 withContext 会导致异常,应该通过 flowOn 进行切换
private fun onThreadSwitchErro(mainScope: CoroutineScope) = mainScope.launch {
simpleErro().collect { value -> log("collect $value", coroutineContext) }
}
private fun simpleErro(): Flow<Int> =
flow { // The WRONG way to change context for CPU-consuming code in flow builder
try {
kotlinx.coroutines.withContext(Dispatchers.Default) {
for (i in 1..3) {
delay(1000)
Thread.sleep(1000) // pretend we are computing it in CPU-consuming way
log("emit $i", coroutineContext)
emit(i) // emit next value
}
}
} catch (e: Exception) {
log("Exception $e")
}
}
private fun simple(): Flow<Int> = flow {
for (i in 1..3) {
delay(1000)
Thread.sleep(1000) // pretend we are computing it in CPU-consuming way
log("emit $i", coroutineContext)
emit(i) // emit next value
}
}.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder
private fun onThreadFlowOn(mainScope: CoroutineScope) = mainScope.launch {
simple().collect { value ->
log("collect $value", coroutineContext)
}
}
Buffer :通过 buffer 获取数据
*** conflate*** : 获取最新的数据
*** collectLatest***:获取最后的数据
fun onBuffer(mainScope: CoroutineScope) = mainScope.launch {
val time = measureTimeMillis {
simple().collect { value ->
delay(300) // pretend we are processing it for 300 ms
log("Normal $value")
}
}
log("Collected in $time ms Normal",coroutineContext)
val time1 = measureTimeMillis {
simple().buffer(3).collect { value ->
delay(300) // pretend we are processing it for 300 ms
log("buffer $value",coroutineContext)
}
}
log("Collected in $time1 ms buffer",coroutineContext)
val time2 = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collect { value ->
delay(300) // pretend we are processing it for 300 ms
log("conflate $value",coroutineContext)
}
}
log("Collected in $time2 ms ",coroutineContext)
val time3 = measureTimeMillis {
simple()
.conflate() // conflate emissions, don't process each one
.collectLatest { value ->
log("collectLatest start $value",coroutineContext)
delay(300) // pretend we are processing it for 300 ms
log("collectLatest end $value",coroutineContext)
}
}
log("Collected in $time3 ms ",coroutineContext)
}
private fun simple(): Flow<Int> = flow {
for (i in 1..6) {
log("emit: $i")
delay(100) // pretend we are asynchronously waiting 100 ms
emit(i) // emit next value
}
}
Thread[main,5,main] : flowItem.index 10
Thread[main,5,main] : emit: 1
Thread[main,5,main] : Normal 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : Normal 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : Normal 3
Thread[main,5,main] : emit: 4
Thread[main,5,main] : Normal 4
Thread[main,5,main] : emit: 5
Thread[main,5,main] : Normal 5
Thread[main,5,main] : emit: 6
Thread[main,5,main] : Normal 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 2442 ms Normal
Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 1
Thread[main,5,main] : emit: 5
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : buffer 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1960 ms buffer
Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main] : emit: 3
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 1
Thread[main,5,main] : emit: 5
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : conflate 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 1034 ms
Thread[main,5,main] : emit: 1
Thread[main,5,main] : emit: 2
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 1
Thread[main,5,main] : emit: 3
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 2
Thread[main,5,main] : emit: 4
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 3
Thread[main,5,main] : emit: 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 4
Thread[main,5,main] : emit: 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 5
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest start 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : collectLatest end 6
Thread[main,5,main]-->StandaloneCoroutine{Active}@3f94d99-->135931383-->null : Collected in 960 ms
fun onCompose(mainScope: CoroutineScope) = mainScope.launch {
val num = (1..3).asFlow() // numbers 1..3
val str = flowOf("one", "two", "three","four") // strings
num.zip(str) { a, b -> "$a -> $b" } // compose a single string
.collect { log("zip : $it") } // collect and print
val num1 = (1..3).asFlow().onEach { delay(300) } // numbers 1..3 every 300 ms
val str1 = flowOf("one", "two", "three","four").onEach { delay(400) } // strings every 400 ms
val startTime = System.currentTimeMillis() // remember the start time
num1.zip(str1) { a, b -> "$a -> $b" } // compose a single string with "zip"
.collect { value -> // collect and print
log("zip delay : $value at ${System.currentTimeMillis() - startTime} ms from start")
}
num1.combine(str1) { a, b -> "$a -> $b" } // compose a single string with "combine"
.collect { value -> // collect and print
log("combine delay : $value at ${System.currentTimeMillis() - startTime} ms from start")
}
}
Thread[main,5,main] : zip : 1 -> one
Thread[main,5,main] : zip : 2 -> two
Thread[main,5,main] : zip : 3 -> three
Thread[main,5,main] : zip delay : 1 -> one at 406 ms from start
Thread[main,5,main] : zip delay : 2 -> two at 806 ms from start
Thread[main,5,main] : zip delay : 3 -> three at 1207 ms from start
Thread[main,5,main] : combine delay : 1 -> one at 1617 ms from start
Thread[main,5,main] : combine delay : 2 -> one at 1821 ms from start
Thread[main,5,main] : combine delay : 2 -> two at 2020 ms from start
Thread[main,5,main] : combine delay : 3 -> two at 2125 ms from start
Thread[main,5,main] : combine delay : 3 -> three at 2423 ms from start
Thread[main,5,main] : combine delay : 3 -> four at 2827 ms from start
@OptIn(FlowPreview::class)
fun onFlatMapConcat(mainScope: CoroutineScope) = mainScope.launch {
var startTime = System.currentTimeMillis() // remember the start time
var useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
startTime = System.currentTimeMillis() // remember the start time
useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(600) } // emit a number every 100 ms
.flatMapConcat { requestFlow(it) }
.collect { value -> // collect and print
log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
}
Thread[main,5,main] : start : 1: First: 110 : 110
Thread[main,5,main] : start : 1: Second: 612 : 501
Thread[main,5,main] : start : 1: Third: 1114 : 501
Thread[main,5,main] : start : 2: First: 1217 : 102
Thread[main,5,main] : start : 2: Second: 1718 : 501
Thread[main,5,main] : start : 2: Third: 2221 : 502
Thread[main,5,main] : start : 3: First: 2328 : 106
Thread[main,5,main] : start : 3: Second: 2831 : 503
Thread[main,5,main] : start : 3: Third: 3334 : 502
Thread[main,5,main] : end : 1: First: 607 : 607
Thread[main,5,main] : end : 1: Second: 1110 : 502
Thread[main,5,main] : end : 1: Third: 1615 : 504
Thread[main,5,main] : end : 2: First: 2220 : 605
Thread[main,5,main] : end : 2: Second: 2723 : 502
Thread[main,5,main] : end : 2: Third: 3226 : 502
Thread[main,5,main] : end : 3: First: 3832 : 605
Thread[main,5,main] : end : 3: Second: 4335 : 502
Thread[main,5,main] : end : 3: Third: 4838 : 502
@OptIn(FlowPreview::class)
fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch {
var startTime = System.currentTimeMillis() // remember the start time
var useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
startTime = System.currentTimeMillis() // remember the start time
useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
}
Thread[main,5,main] : start : 1: First: 814 : 814
Thread[main,5,main] : start : 1: Second: 1316 : 501
Thread[main,5,main] : start : 2: First: 1615 : 299
Thread[main,5,main] : start : 1: Third: 1816 : 201
Thread[main,5,main] : start : 2: Second: 2117 : 301
Thread[main,5,main] : start : 3: First: 2416 : 299
Thread[main,5,main] : start : 2: Third: 2623 : 206
Thread[main,5,main] : start : 3: Second: 2921 : 297
Thread[main,5,main] : start : 3: Third: 3424 : 502
Thread[main,5,main] : end : 1: First: 116 : 116
Thread[main,5,main] : end : 2: First: 221 : 104
Thread[main,5,main] : end : 3: First: 326 : 104
Thread[main,5,main] : end : 1: Second: 620 : 293
Thread[main,5,main] : end : 2: Second: 725 : 104
Thread[main,5,main] : end : 3: Second: 829 : 104
Thread[main,5,main] : end : 1: Third: 1124 : 294
Thread[main,5,main] : end : 2: Third: 1229 : 105
Thread[main,5,main] : end : 3: Third: 1333 : 104
@OptIn(FlowPreview::class)
fun onFlatMapMerge(mainScope: CoroutineScope) = mainScope.launch {
var startTime = System.currentTimeMillis() // remember the start time
var useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(800) } // emit a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
log("start : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
startTime = System.currentTimeMillis() // remember the start time
useTime = System.currentTimeMillis()
(1..3).asFlow().onEach { delay(100) } // emit a number every 100 ms
.flatMapMerge { requestFlow(it) }
.collect { value -> // collect and print
log("end : $value: ${System.currentTimeMillis() - startTime} : ${System.currentTimeMillis() - useTime}")
useTime = System.currentTimeMillis()
}
}
Thread[main,5,main] : start : 1: First: 804 : 804
Thread[main,5,main] : start : 1: Second: 1305 : 501
Thread[main,5,main] : start : 2: First: 1607 : 301
Thread[main,5,main] : start : 2: Second: 2111 : 504
Thread[main,5,main] : start : 3: First: 2421 : 309
Thread[main,5,main] : start : 3: Second: 2923 : 501
Thread[main,5,main] : start : 3: Third: 3427 : 504
Thread[main,5,main] : end : 1: First: 111 : 111
Thread[main,5,main] : end : 2: First: 224 : 113
Thread[main,5,main] : end : 3: First: 336 : 111
Thread[main,5,main] : end : 3: Second: 838 : 501
Thread[main,5,main] : end : 3: Third: 1342 : 504
fun onDeclarative(mainScope: CoroutineScope) = mainScope.launch {
try {
simple().collect { value -> log("try : $value") }
} catch (e:Exception){
log("try : $e")
}finally {
log("try : Done")
}
simple()
.onCompletion { cause ->log("Flow completed exceptionally $cause") }
.catch { cause -> log("Caught exception: $cause") }.collect { value -> log("catch $value") }
(1..4).asFlow()
.onCompletion { cause -> log("asFlow Flow completed with $cause") }
.catch { cause -> log("asFlow Caught exception: $cause") }
.collect { value ->
log("asFlow $value")
}
}
private fun simple(): Flow<Int> = flow {
emit(1)
throw RuntimeException()
}
Thread[main,5,main] : flowItem.index 15
Thread[main,5,main] : try : 1
Thread[main,5,main] : try : java.lang.RuntimeException
Thread[main,5,main] : try : Done
Thread[main,5,main] : catch 1
Thread[main,5,main] : Flow completed exceptionally java.lang.RuntimeException
Thread[main,5,main] : Caught exception: java.lang.RuntimeException
Thread[main,5,main] : asFlow 1
Thread[main,5,main] : asFlow 2
Thread[main,5,main] : asFlow 3
Thread[main,5,main] : asFlow 4
Thread[main,5,main] : asFlow Flow completed with null
fun onCollect(mainScope: CoroutineScope) = mainScope.launch {
events()
.onEach { event -> log("Collect Event: $event") }
.collect() // <--- Collecting the flow waits
log("Collect Done")
}
private fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
Thread[main,5,main] : Collect Event: 1
Thread[main,5,main] : Collect Event: 2
Thread[main,5,main] : Collect Event: 3
Thread[main,5,main] : Collect Done
private fun events(): Flow<Int> = (1..3).asFlow().onEach { delay(100) }
fun onLaunchIn(mainScope: CoroutineScope) = mainScope.launch {
events()
.onEach { event -> log("LaunchIn Event: $event") }
.launchIn(mainScope) // <--- Collecting the flow waits
log("LaunchIn Done")
}
Thread[main,5,main] : LaunchIn Done
Thread[main,5,main] : LaunchIn Event: 1
Thread[main,5,main] : LaunchIn Event: 2
Thread[main,5,main] : LaunchIn Event: 3
/**
* 因为 繁忙的数据流无法取消
*/
fun onAsFlow(mainScope: CoroutineScope) = mainScope.launch {
(1..5).asFlow().collect { value ->
if (value == 3) cancel()
log("asFlow : $value")
}
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3
// Thread[main,5,main] : asFlow : 4
// Thread[main,5,main] : asFlow : 5
/**
* 因为 asFlow 默认使用 ensureActive检查所以可以取消
*/
fun onFlow(mainScope: CoroutineScope) = mainScope.launch {
foo().collect { value ->
if (value == 3) cancel()
log("cancel : $value")
}
}
// Thread[main,5,main] : Emitting 1
// Thread[main,5,main] : cancel : 1
// Thread[main,5,main] : Emitting 2
// Thread[main,5,main] : cancel : 2
// Thread[main,5,main] : Emitting 3
// Thread[main,5,main] : cancel : 3
// Thread[main,5,main] : Emitting 4
/**
* 通过 cancellable 是流可以被取消
*/
fun onCancel(mainScope: CoroutineScope) = mainScope.launch {
(1..5).asFlow().cancellable().collect { value ->
if (value == 3) cancel()
log("asFlow : $value")
}
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3
/**
* 通过 currentCoroutineContext().ensureActive() 是流可以被取消
*/
fun onEnsureActive(mainScope: CoroutineScope) = mainScope.launch {
(1..5).asFlow().onEach { currentCoroutineContext().ensureActive() }.collect { value ->
if (value == 3) cancel()
log("asFlow : $value")
}
}
// Thread[main,5,main] : asFlow : 1
// Thread[main,5,main] : asFlow : 2
// Thread[main,5,main] : asFlow : 3
private fun foo(): Flow<Int> = flow {
for (i in 1..5) {
log("Emitting $i")
emit(i)
}
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun CoroutineScope.produceSquares(): ReceiveChannel<Int> = produce {
for (x in 1..5) {
delay(300)
send(x * x)
}
}
fun onProducer(mainScope: CoroutineScope) = mainScope.launch {
val squares = produceSquares()
squares.consumeEach { log(it) }
println("Done!")
}
private fun CoroutineScope.produceNumbers() = produce<Int> {
var x = 1
while (true) send(x++) // 在流中开始从 1 生产无穷多个整数
}
private fun CoroutineScope.square(numbers: ReceiveChannel<Int>): ReceiveChannel<Int> = produce {
for (x in numbers) send(x * x * x)
}
fun onPipeline(mainScope: CoroutineScope) = mainScope.launch {
val numbers = produceNumbers() // 从 1 开始生成整数
val squares = square(numbers) // 整数求平方
repeat(5) {
log(squares.receive()) // 输出前五个
}
log("Done!") // 至此已完成
coroutineContext.cancelChildren() // 取消子协程
}
@OptIn(ExperimentalCoroutinesApi::class)
private fun CoroutineScope.produceNumbers() = produce {
var x = 1 // start from 1
while (true) {
send(x++) // produce next
delay(100) // wait 0.1s
}
}
private fun CoroutineScope.launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch {
for (msg in channel) {
log("Processor #$id received $msg",coroutineContext)
}
}
fun fanOut(mainScope:CoroutineScope) = mainScope.launch {
val producer = produceNumbers()
repeat(5) { launchProcessor(it, producer) }
delay(950)
producer.cancel() // cancel producer coroutine and thus kill them all
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.numberProducer(start: Int) = produce<Int> {
var index = start
while (true) send(index++)
}
@OptIn(ExperimentalCoroutinesApi::class)
fun CoroutineScope.filter(numbers: ReceiveChannel<Int>, prime: Int) = produce<Int> {
for (x in numbers) if (x % prime != 0) send(x)
}
fun findPrime(mainScope: CoroutineScope) = mainScope.launch {
var cur = numberProducer(2)
repeat(10) {
val prime = cur.receive()
log(prime)
cur = filter(cur, prime)
}
coroutineContext.cancelChildren()
}
fun sendWithBuffer(mainScope:CoroutineScope) = mainScope.launch {
val channel = Channel<Int>(4) // create buffered channel
val sender = launch { // launch sender coroutine
repeat(10) {
log("Four Sending $it") // print before sending each element
channel.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender.cancel() // cancel sender coroutine
val channel1 = Channel<Int>(2) // create buffered channel
val sender1 = launch { // launch sender coroutine
repeat(10) {
log("Two Sending $it") // print before sending each element
channel1.send(it) // will suspend when buffer is full
}
}
// don't receive anything... just wait....
delay(1000)
sender1.cancel() // cancel sender coroutine
}
Thread[main,5,main] : Four Sending 0
Thread[main,5,main] : Four Sending 1
Thread[main,5,main] : Four Sending 2
Thread[main,5,main] : Four Sending 3
Thread[main,5,main] : Four Sending 4
Thread[main,5,main] : Two Sending 0
Thread[main,5,main] : Two Sending 1
Thread[main,5,main] : Two Sending 2
发送和接收操作是 公平的 并且尊重调用它们的多个协程。它们遵守先进先出原则
data class Ball(var hits: Int)
fun showChannelFair(mainScope:CoroutineScope) = mainScope.launch {
val table = Channel<Ball>() // a shared table
launch { player("ping", table) }
launch { player("pong", table) }
table.send(Ball(0)) // serve the ball
delay(1000) // delay 1 second
coroutineContext.cancelChildren() // game over, cancel them
}
suspend fun player(name: String, table: Channel<Ball>) {
for (ball in table) { // receive the ball in a loop
ball.hits++
log("$name $ball")
delay(300) // wait a bit
table.send(ball) // send the ball back
}
}
// Thread[main,5,main] : ping Ball(hits=1)
// Thread[main,5,main] : pong Ball(hits=2)
// Thread[main,5,main] : ping Ball(hits=3)
// Thread[main,5,main] : pong Ball(hits=4)
计时器通道是一种特别的会合通道,每次经过特定的延迟都会从该通道进行消费并产生 Unit
fun tickerSend(mainScope:CoroutineScope) = mainScope.launch {
val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) //创建计时器通道
var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
log("Initial element is available immediately: $nextElement") // no initial delay
nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay
log("Next element is not ready in 50 ms: $nextElement")
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
log("Next element is ready in 100 ms: $nextElement")
// 模拟大量消费延迟
log("Consumer pauses for 150ms")
delay(150)
// 下一个元素立即可用
nextElement = withTimeoutOrNull(1) { tickerChannel.receive() }
log("Next element is available immediately after large consumer delay: $nextElement")
// 请注意,`receive` 调用之间的暂停被考虑在内,下一个元素的到达速度更快
nextElement = withTimeoutOrNull(60) { tickerChannel.receive() }
log("Next element is ready in 50ms after consumer pause in 150ms: $nextElement")
tickerChannel.cancel() // 表明不再需要更多的元素
}
// Thread[main,5,main] : Initial element is available immediately: kotlin.Unit
// Thread[main,5,main] : Next element is not ready in 50 ms: null
// Thread[main,5,main] : Next element is ready in 100 ms: kotlin.Unit
// Thread[main,5,main] : Consumer pauses for 150ms
// Thread[main,5,main] : Next element is available immediately after large consumer delay: kotlin.Unit
// Thread[main,5,main] : Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit
@OptIn(DelicateCoroutinesApi::class)
fun propagationException(mainScope:CoroutineScope) = mainScope.launch {
val deferred = GlobalScope.async { // root coroutine with async
log("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
log("Unreached")
} catch (e: ArithmeticException) {
log("Caught ArithmeticException")
}
val job = try {
GlobalScope.launch { // root coroutine with launch
log("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
}catch(e:Exception) {
log("Throwing exception from launch")
null
}
job?.join()
log("Joined failed job")
}
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from async
Thread[main,5,main] : Caught ArithmeticException
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launch
2022-10-28 15:12:21.144 24517-24585/com.zhihaoliang.coroutine E/AndroidRuntime: FATAL EXCEPTION: DefaultDispatcher-worker-1
Process: com.zhihaoliang.coroutine, PID: 24517
java.lang.IndexOutOfBoundsException
at com.zhihaoliang.coroutine.buss.excep.func.ExceptionPropagation$propagationException$1$job$1.invokeSuspend(ExceptionPropagation.kt:29)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@65b7906, Dispatchers.Default]
private val handler = CoroutineExceptionHandler { _, exception ->
log("CoroutineExceptionHandler got $exception")
}
fun handlerException(mainScope:CoroutineScope) = mainScope.launch {
val deferred = GlobalScope.async(handler) { // root coroutine with async
log("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
log("Unreached")
} catch (e: ArithmeticException) {
log("Caught ArithmeticException")
}
val job = try {
GlobalScope.launch(handler) { // root coroutine with launch
log("Throwing exception from launch")
throw IndexOutOfBoundsException() // Will be printed to the console by Thread.defaultUncaughtExceptionHandler
}
}catch(e:Exception) {
log("Throwing exception from launch")
null
}
job?.join()
log("Joined failed job")
}
Thread[main,5,main] : com.zhihaoliang.coroutine.buss.excep.ExceptionActivity
Thread[main,5,main] : exceptionItem.index 1
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from async
Thread[main,5,main] : Caught ArithmeticException
Thread[DefaultDispatcher-worker-1,5,main] : Throwing exception from launch
Thread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.IndexOutOfBoundsException
Thread[main,5,main] : Joined failed job
CancellationException are ignored by all handlers
fun cancelExceptionNormal(mainScope:CoroutineScope) = mainScope.launch {
val job = launch {
val child = launch {
try {
delay(Long.MAX_VALUE)
} finally {
log("Child is cancelled")
}
}
yield()
log("Cancelling child")
child.cancel()
child.join()
yield()
log("Parent is not cancelled")
}
job.join()
}
// Thread[main,5,main] : Cancelling child
// Thread[main,5,main] : Child is cancelled
// Thread[main,5,main] : Parent is not cancelled
@OptIn(DelicateCoroutinesApi::class)
fun cancelExceptionHander(mainScope:CoroutineScope) = mainScope.launch {
val handler = CoroutineExceptionHandler { _, exception ->
log("CoroutineExceptionHandler got $exception")
}
val job = GlobalScope.launch(handler) {
launch { // the first child
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
log("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
log("The first child finished its non cancellable block")
}
}
}
launch { // the second child
delay(10)
log("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
}
// Thread[DefaultDispatcher-worker-2,5,main] : Second child throws an exception
// Thread[DefaultDispatcher-worker-1,5,main] : Children are cancelled, but exception is not handled until all children terminate
// Thread[DefaultDispatcher-worker-1,5,main] : The first child finished its non cancellable block
// Thread[DefaultDispatcher-worker-1,5,main] : CoroutineExceptionHandler got java.lang.ArithmeticException
当协程的多个子协程因异常而失败时, 一般规则是“取第一个异常”,因此将处理第一个异常。 在第一个异常之后发生的所有其他异常都作为被抑制的异常绑定至第一个异常
@OptIn(DelicateCoroutinesApi::class)
fun exceptionAggregationNormal(mainScope: CoroutineScope) = mainScope.launch {
val handler = CoroutineExceptionHandler { _, exception ->
log(
"CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
launch {
try {
delay(Long.MAX_VALUE) // 当另一个同级的协程因 IOException 失败时,它将被取消
} finally {
throw ArithmeticException() // 第二个异常
}
}
launch {
delay(100)
throw IOException() // 首个异常
}
delay(Long.MAX_VALUE)
}
job.join()
}
// Thread[DefaultDispatcher-worker-3,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed [java.lang.ArithmeticException]
取消异常是透明的,默认情况下是未包装的:handler未捕获到
@OptIn(DelicateCoroutinesApi::class)
fun exceptionAggregationCancel(mainScope: CoroutineScope) = mainScope.launch {
val handler = CoroutineExceptionHandler { _, exception ->
log(
"CoroutineExceptionHandler got $exception with suppressed ${exception.suppressed.contentToString()}")
}
val job = GlobalScope.launch(handler) {
val inner = launch { // 该栈内的协程都将被取消
launch {
launch {
throw IOException() // 原始异常
}
}
}
try {
inner.join()
} catch (e: CancellationException) {
log("Rethrowing CancellationException with original cause")
throw e // 取消异常被重新抛出,但原始 IOException 得到了处理
}
}
job.join()
}
// Thread[DefaultDispatcher-worker-3,5,main] : Rethrowing CancellationException with original cause
// Thread[DefaultDispatcher-worker-2,5,main] : CoroutineExceptionHandler got java.io.IOException with suppressed []
fun supervisionJob(mainScope:CoroutineScope) = mainScope.launch {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
log("The first child is failing")
throw AssertionError("The first child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
log("The first child is cancelled: ${firstChild.isCancelled}, but the second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
log("The second child is cancelled because the supervisor was cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
log("Cancelling the supervisor")
supervisor.cancel()
secondChild.join()
}
}
// Thread[main,5,main] : The first child is failing
// Thread[main,5,main] : The first child is cancelled: true, but the second one is still active
// Thread[main,5,main] : Cancelling the supervisor
// Thread[main,5,main] : The second child is cancelled because the supervisor was cancelled
fun supervisionScope(mainScope:CoroutineScope) = mainScope.launch {
try {
supervisorScope {
val child = launch {
try {
log("The child is sleeping")
delay(Long.MAX_VALUE)
} finally {
log("The child is cancelled")
}
}
// Give our child a chance to execute and print using yield
yield()
log("Throwing an exception from the scope")
throw AssertionError()
}
} catch(e: AssertionError) {
log("Caught an assertion error")
}
}
// Thread[main,5,main] : The child is sleeping
// Thread[main,5,main] : Throwing an exception from the scope
// Thread[main,5,main] : The child is cancelled
// Thread[main,5,main] : Caught an assertion error
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // number of coroutines to launch
val k = 1000 // times an action is repeated by each coroutine
val time = measureTimeMillis {
coroutineScope { // scope for coroutines
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
log("Completed ${n * k} actions in $time ms")
}
fun showErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) {
var counter = 0
massiveRun {
counter++
}
log("Counter = $counter")
}
// Thread[DefaultDispatcher-worker-8,5,main] : Completed 100000 actions in 21 ms
// Thread[DefaultDispatcher-worker-8,5,main] : Counter = 45004
@Volatile // in Kotlin `volatile` is an annotation
var counterVolatile = 0
fun showVolatilesErro(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default) {
counterVolatile = 0
massiveRun {
counterVolatile++
}
log("counterVolatile = $counterVolatile")
}
// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 77 ms
// Thread[DefaultDispatcher-worker-7,5,main] : counterVolatile = 88777
// Thread[DefaultDispatcher-worker-7,5,main] : Completed 100000 actions in 34 ms
// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 100000
val counterContext = newSingleThreadContext("CounterContext")
private fun runSingleThread(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
var counter = 0
withContext(counterContext) {
massiveRun {
counter++
}
}
log("Counter = $counter")
}
// Thread[CounterContext,5,main] : Completed 100000 actions in 65 ms
// Thread[DefaultDispatcher-worker-7,5,main] : Counter = 100000
val mutex = Mutex()
private fun useMutual(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
var counter = 0
massiveRun {
// protect each increment with lock
mutex.withLock {
counter++
}
}
log("Counter = $counter")
}
// Thread[DefaultDispatcher-worker-2,5,main] : Completed 100000 actions in 1547 ms
// Thread[DefaultDispatcher-worker-2,5,main] : Counter = 100000
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
private fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
private fun useActor(mainScope: CoroutineScope) = mainScope.launch(Dispatchers.Default){
val counter = counterActor() // 创建该 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
log("Counter = ${response.await()}")
counter.close() // 关闭该actor
}
// Thread[DefaultDispatcher-worker-4,5,main] : Completed 100000 actions in 907 ms
// Thread[DefaultDispatcher-worker-4,5,main] : Counter = 100000
代码地址:https://gitee.com/trainAndroid/coroutine#1exception-propagation