基础概念 协程 怎么定义?
WIKI: Coroutines are computer program components that generalize subroutines for non-preemptive multitasking, by allowing execution to be suspended and resumed.
协程是计算机程序组件,它通过允许暂停和恢复执行来概括非抢占式多任务处理的子程序。
协程概念最核心的点 在于:函数或者一段程序能够被挂起,稍后再在挂起的位置恢复。
挂起和恢复是开发者的程序逻辑自己控制的,协程是通过主动挂起出让运行权来实现协作,因此本质上是讨论程序控制流程的机制。
与线程最大的差别
从任务的角度,线程一旦执行就不会暂停,直到任务结束,这个过程是连续的;
从调度方式的角度,线程之间是抢占式的调度,因此不存在协作问题.
异步程序设计 同步,异步(用线程来理解)
同步异步是相对于指令的执行顺序来说的,顺序执行就是同步,反之就是异步。
// Todo: 添加同步异步代码DEMO
回调地狱:回调不断嵌套,让程序难以理解和掌控
// Todo: 添加回调地狱的DEMO
异步程序的关键问题 结果传递
异步调用的结果是立即返回的,被调用方有俩种情况
异常处理
同步
1 2 3 4 5 try { ... }catch (...){ ... }
取消响应
取消响应中的响应是很关键的一点,需要异步任务主动配合取消,如果不配合,那么外部也就没有办法,只能听之任之。
复杂分支
常见的设计思路 Future
JDK 1.5引入,有一个get方法,能够同步阻塞地返回Future对应的异步任务的结果。但一旦调用其中一个get,当前调用也就被阻塞了。在所有的get返回之前,当前的调用流程会一直被限制在这段逻辑中。
通过阻塞当前调用来等待异步结果,让异步的逻辑变得不像”异步“了,是因为我们还得同步等待结果。
CompletableFuture
JDK 1.8新增,通过它可以拿到异步结果。与直接使用Future不同,get函数的调用仍然在CompletableFuture提供的异步调用环境中,不会阻塞主调用流程。
解决了异步结果不阻塞主调用流程的问题,但让结果的获取脱离了主调用流程。
Promise 与async/ await
Promise是一个异步任务,存在挂起,完成,拒绝三个状态。
当处于完成时状态,结果通过调用then方法的参数进行回调;
当出现异常拒绝时,通过catch方法传入的参数来捕获拒绝的原因。
async/await很好的兼顾了异步任务执行和同步语法结构的需求。
Reactive Programming
主要关注的是数据流的变换和流转,更注重数据的输入和输出之间的关系。
koltin协程 只用一个关键字 “suspend”表示挂起点,包含了异步调用和回调俩层含义。
所有异步回调对于当前调用流程来说都是一个挂起点,在这个挂起点我们可以做的事情非常多,既可以像async/await那样异步回调,又可以添加调度器处理线程切换,还可以作为协程取消响应的位置,等。
异步逻辑同步化。
Continuation Passing Style(CPS) Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。回调函数 callback 被称为续体(Continuation),它决定了程序接下来的行为,整个程序的逻辑通过一个个 Continuation 拼接在一起。
Kotlin 协程本质就是利用 CPS 来实现对过程的控制,并解决了 CPS 会产生的问题(如回调地狱,栈空间占用)
Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换,用看起来同步的方式写出异步的代码,消除回调地狱(callback hell)。
为了避免栈空间过大的问题, Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型。每两个挂起点之间可以看为一个状态,每次进入状态机时都有一个当前的状态,然后执行该状态对应的代码;如果程序执行完毕则返回结果值,否则返回一个特殊值,表示从这个状态退出并等待下次进入。相当于创建了一个可复用的回调,每次都使用这同一个回调,根据不同状态来执行不同的代码。
1 2 3 4 5 6 public interface Continuation <in T> { public val context: CoroutineContext public fun resumeWith (result: Result<T>) }
创建协程 方式1:
1 2 3 4 5 6 7 8 9 10 11 12 13 fun main () { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L ) println("Kotlin" ) } println("Hello" ) Thread.sleep(2000L ) }
方式2:
1 2 3 4 5 6 7 8 9 10 11 12 13 fun main () = runBlocking { val deferred = CoroutineScope(Job()).async(Dispatchers.Default) { println("Hello" ) delay(1000L ) println("Kotlin" ) } deferred.await() }
方式3:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 fun main () { runBlocking(Dispatchers.Default) { println("launch started!" ) delay(1000L ) println("Kotlin" ) } println("Hello" ) Thread.sleep(2000L ) println("Process end!" ) }
异同:
launch:无法获取执行结果,返回类型Job,不会阻塞;
async:可获取执行结果,返回类型Deferred,调用await()会阻塞,不调用则不会阻塞但也无法获取执行结果;
runBlocking:可获取执行结果,阻塞当前线程的执行,多用于Demo、测试,官方推荐只用于连接线程与协程。
理解协程的创建 CoroutineScope
Job
runBlocking
async
launch
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 fun main () { CoroutineScope(Job()).launch(Dispatchers.Default) { delay(1000L ) println("Kotlin" ) } println("Hello" ) Thread.sleep(2000L ) } =====>launchpublic fun CoroutineScope.launch ( context: CoroutineContext = EmptyCoroutineContext, start: CoroutineStart = CoroutineStart.DEFAULT, block: suspend CoroutineScope .() -> Unit ) : Job { val newContext = newCoroutineContext(context) val coroutine = if (start.isLazy) LazyStandaloneCoroutine(newContext, block) else StandaloneCoroutine(newContext, active = true ) coroutine.start(start, coroutine, block) return coroutine } =====> coroutine.start(start, coroutine, block)public abstract class AbstractCoroutine <in T >( parentContext: CoroutineContext, initParentJob: Boolean , active: Boolean ) : JobSupport(active), Job, Continuation<T>, CoroutineScope { ... public fun <R> start (start: CoroutineStart , receiver: R , block: suspend R .() -> T ) { start(block, receiver, this ) } } =====> CoroutineStartpublic enum class CoroutineStart { DEFAULT LAZY ATOMIC UNDISPATCHED: }public operator fun <T> invoke (block: suspend () -> T , completion: Continuation <T >) : Unit = when (this ) { DEFAULT -> block.startCoroutineCancellable(completion) ATOMIC -> block.startCoroutine(completion) UNDISPATCHED -> block.startCoroutineUndispatched(completion) LAZY -> Unit } =====>startCoroutine()public fun <T> (suspend () -> T).startCoroutine(completion: Continuation<T) { createCoroutineUnintercepted(completion).intercepted().resume(Unit ) } =======>createCoroutineUninterceptedpublic expect fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit >public actual fun <T> (suspend () -> T).createCoroutineUnintercepted( completion: Continuation<T> ): Continuation<Unit > { val probeCompletion = probeCoroutineCreated(completion) return if (this is BaseContinuationImpl) create(probeCompletion) else createCoroutineFromSuspendFunction(probeCompletion) { (this as Function1<Continuation<T>, Any?>).invoke(it) } }
参考链接 https://www.bennyhuo.com/project/kotlin-coroutines.html
Kotlin 官方文档 中文版
https://juejin.cn/post/7142743424670629895
https://juejin.cn/post/7137905800504148004