当前位置:主页 > android教程 > Kotlin协程Flow原理

Kotlin协程之Flow基础原理示例解析

发布:2023-03-04 16:00:01 59


为找教程的网友们整理了相关的编程文章,网友马凯旋根据主题投稿了本篇教程内容,涉及到Kotlin协程Flow、Kotlin、Flow、Kotlin协程Flow原理相关内容,已被815网友关注,下面的电子资料对本篇知识点有更加详尽的解释。

Kotlin协程Flow原理

引言

本文分析示例代码如下:

launch(Dispatchers.Main) {
    flow {
        emit(1)
        emit(2)
    }.collect {
        delay(1000)

        withContext(Dispatchers.IO) {
            Log.d("liduo", "$it")
        }

        Log.d("liduo", "$it")
    }
}

一.Flow的创建

在协程中,可以通过flow方法创建一个Flow对象,一个Flow对象代表一个冷流。其中参数block是FlowCollector的扩展方法,并且可挂起。代码入下:

public fun  flow(@BuilderInference block: suspend FlowCollector.() -> Unit): Flow = SafeFlow(block)

FlowCollector是一个接口,用于收集上游的流发出的值,代码如下:

public interface FlowCollector {
    // 可挂起,非线程安全
    public suspend fun emit(value: T)
}

调用flow方法,会返回一个Flow接口指向的对象,代码如下:

public interface Flow {
   
    @InternalCoroutinesApi
    public suspend fun collect(collector: FlowCollector)
}

这里flow方法的返回对象是一个SafeFlow类型的对象。至此Flow就创建完毕了。

二.Flow的消费

在协程中,当需要消费流时,会调用collect方法,触发流的消费,代码如下:

public suspend inline fun  Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector {
        override suspend fun emit(value: T) = action(value)
    })

这里的collect方法不是Flow接口定义的方法,而是Flow的扩展方法,内部创建了一个匿名的FlowCollector对象,并且把action封装到了FlowCollector对象的emit方法中,最后将FlowCollector对象作为参数传入到了另一个collect方法,这个collect方法才是Flow接口定义的方法。

1.SafeFlow类

根据上面的分析,Flow对象最后返回的是一个SafeFlow类型的对象。因此,这里调用的另一个collect方法,就是SafeFlow类中的collect方法,代码如下:

private class SafeFlow(private val block: suspend FlowCollector.() -> Unit) : AbstractFlow() {
    override suspend fun collectSafely(collector: FlowCollector) {
        collector.block()
    }
}

SafeFlow类继承自AbstractFlow类,类中重写了collectSafely方法。因此调用的collect方法实际上是AbstractFlow类的方法。

2.AbstractFlow类

AbstractFlow类是一个抽象类,实现了Flow接口和CancellableFlow接口。实际上CancellableFlow接口继承自Flow接口,因此AbstractFlow类只重写了collect方法,代码如下:

@FlowPreview
public abstract class AbstractFlow : Flow, CancellableFlow {

     // 核心方法 
    @InternalCoroutinesApi
    public final override suspend fun collect(collector: FlowCollector) {
        // 创建SafeCollector对象,对collector进行包裹
        val safeCollector = SafeCollector(collector, coroutineContext)
        try {
            // 调用collectSafely方法
            collectSafely(safeCollector)
        } finally {
            // 释放拦截的续体
            safeCollector.releaseIntercepted()
        }
    }
    
    public abstract suspend fun collectSafely(collector: FlowCollector)
}

collect方法内部调用了collectSafely方法,collectSafely方法在SafeFlow中被重写。collectSafely方法中会调用flow中的block,并提供一个SafeCollector类的环境。

3. SafeCollector类

当flow方法中的代码在执行时,会调用emit方法发射数据,这时由于block执行在SafeCollector类的环境中,因此调用的emit方法是SafeCollector类的方法。

SafeCollector类实现了FlowCollector接口并且继承自ContinuationImpl类,代码如下:

internal actual class SafeCollector actual constructor(
    @JvmField internal actual val collector: FlowCollector,
    @JvmField internal actual val collectContext: CoroutineContext
) : FlowCollector, ContinuationImpl(NoOpContinuation, EmptyCoroutineContext), CoroutineStackFrame {
    
    ...
    // 保存上下文中元素数量,用于检查上下文是否变化
    @JvmField
    internal actual val collectContextSize = collectContext.fold(0) { count, _ -> count + 1 }
    // 保存上一次的上下文
    private var lastEmissionContext: CoroutineContext? = null
    // 执行结束后的续体
    private var completion: Continuation? = null

    // 协程上下文
    override val context: CoroutineContext
        get() = completion?.context ?: EmptyCoroutineContext

    // 挂起的核心方法
    override fun invokeSuspend(result: Result): Any? {
        result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
        completion?.resumeWith(result as Result)
        return COROUTINE_SUSPENDED
    }

    // 释放拦截的续体
    public actual override fun releaseIntercepted() {
        super.releaseIntercepted()
    }

    // 发射数据
    override suspend fun emit(value: T) {
        // 获取当前suspend方法续体
        return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
            try {
                // 调用重载的方法
                emit(uCont, value)
            } catch (e: Throwable) {
                 // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
                lastEmissionContext = DownstreamExceptionElement(e)
                // 抛出异常
                throw e
            }
        }
    }

    // 重载的emit方法
    private fun emit(uCont: Continuation, value: T): Any? {
        // 从续体中获取上下文
        val currentContext = uCont.context
        // 保证当前协程的Job是active的
        currentContext.ensureActive()
        // 获取上次的上下文
        val previousContext = lastEmissionContext
        // 如果前后上下文发生变化
        if (previousContext !== currentContext) {
            // 检查上下文是否发生异常
            checkContext(currentContext, previousContext, value)
        }
        // 保存续体
        completion = uCont
        // 调用emitFun方法,传入collector,value,continuation
        return emitFun(collector as FlowCollector, value, this as Continuation)
    }

    // 检查上下文变化,防止并发
    private fun checkContext(
        currentContext: CoroutineContext,
        previousContext: CoroutineContext?,
        value: T
    ) {
        // 如果上次执行过程中发生了异常
        if (previousContext is DownstreamExceptionElement) {
            // 抛出异常
            exceptionTransparencyViolated(previousContext, value)
        }
        // 检查上下文是否发生变化,如果变化,则抛出异常
        checkContext(currentContext)
        lastEmissionContext = currentContext
    }
    
    // 用于抛出异常
    private fun exceptionTransparencyViolated(exception: DownstreamExceptionElement, value: Any?) {
        error("""
            Flow exception transparency is violated:
                Previous 'emit' call has thrown exception ${exception.e}, but then emission attempt of value '$value' has been detected.
                Emissions from 'catch' blocks are prohibited in order to avoid unspecified behaviour, 'Flow.catch' operator can be used instead.
                For a more detailed explanation, please refer to Flow documentation.
            """.trimIndent())
    }
}

emit方法最终会调用emitFun方法方法,代码如下:

private val emitFun =
    FlowCollector::emit as Function3, Any?, Continuation, Any?>

emitFun是一个lambda表达式,它将只有一个参数的emit方法转换成三个参数的方法。emitFun方法在编译时会被编译器处理,反编译后的代码逻辑大致如下:

@Nullable
public final Object invoke(@NotNull FlowCollector p1, @Nullable Object p2, @NotNull Continuation continuation) {
   InlineMarker.mark(0);
   // 核心执行
   Object var10000 = p1.emit(p2, continuation);
   InlineMarker.mark(2);
   InlineMarker.mark(1);
   return var10000;
}

可以看到,emitFun方法内部会调用FlowCollector类对象的emit方法,同时传入value和continuation作为参数。

而这个FlowCollector类对象就是一开始的collect方法封装的匿名类对象,代码如下:

public suspend inline fun  Flow.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector {
        override suspend fun emit(value: T) = action(value)
    })

调用它的emit方法,会直接调用action的invoke方法,并传入发射的数据,流在这里被最终消费。

通过上面的分析,可以知道消费的过程是在emit方法中被调用的,如果在消费的过程,没有发生挂起,那么emit方法执行完毕后,会继续执行flow方法里剩下的代码,而如果在消费的过程中发生了挂起,情况会稍有不同。

4.消费过程中的挂起

如果消费过程中发生挂起,那么emit方法会返回一个COROUTINE_SUSPENDED对象,suspendCoroutineUninterceptedOrReturn方法在收到COROUTINE_SUSPENDED对象后,会挂起当前协程。代码如下:

override suspend fun emit(value: T) {
    // 获取当前suspend方法续体
    return suspendCoroutineUninterceptedOrReturn sc@{ uCont ->
        try {
            // 调用重载的方法
            emit(uCont, value)
        } catch (e: Throwable) {
            // 出现异常时,将异常封装成上下文,保存到lastEmissionContext
            lastEmissionContext = DownstreamExceptionElement(e)
            // 抛出异常
            throw e
        }
    }
}

当消费过程执行完毕时,会通过传入的续体唤起外部协程恢复挂起状态。根据emitFun可以知道,这里传入的续体为this,也就是当前的SafeCollector类对象,代码如下:

emitFun(collector as FlowCollector, value, this as Continuation)

恢复挂起需要调用续体的resumeWith方法,上面提到SafeCollector类继承自ContinuationImpl类,SafeCollector类中没有重写resumeWith方法,而ContinuationImpl类中也没有重写resumeWith方法,因此实际调用的是ContinuationImpl类的父类BaseContinuationImpl类的resumeWith方法。如下图所示:

Kotlin协程:创建、启动、挂起、恢复中提到过,调用BaseContinuationImpl类的resumeWith方法,内部会调用invokeSuspend方法,而SafeCollector类重写了invokeSuspend方法,代码如下:

override fun invokeSuspend(result: Result): Any? {
    // 尝试获取异常
    result.onFailure { lastEmissionContext = DownstreamExceptionElement(it) }
    // 如果没有异常,则恢复flow方法续体的执行
    completion?.resumeWith(result as Result)
    // 返回挂起标识,这里挂起的是消费过程
    return COROUTINE_SUSPENDED
}

在invokeSuspend方法中,会调用resumeWith方法恢复生产过程——flow方法的执行,同时挂起消费过程的执行。全部过程如下图所示:

以上就是Kotlin协程之Flow基础原理示例解析的详细内容,更多关于Kotlin协程Flow原理的资料请关注码农之家其它相关文章!


参考资料

相关文章

  • 深度学习TensorFlow Session会话控制和Variable变量

    发布:2020-03-18

    今天小编就为大家分享一篇TensorFlow Session会话控制Variable变量详解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧


  • Kotlin挂起函数的详细介绍

    发布:2023-03-05

    挂起函数用状态机以挂起点将协程的运算逻辑拆分成不同的片段,每次执行协程运行不同的逻辑片段,由此可以知道协程是运行在线程中的,线程的并发处理方式也可以用在协程上


  • Kotlin Select协程多路复用的实现详解

    发布:2023-03-08

    select是Kotlin 1.6中的特性,即选择最快的结果。select与async、Channel结合使用,可以大大提高程序的响应速度,还可以提高程序的灵活性、扩展性


  • tensorflow更改变量的值实例详解

    发布:2019-06-23

    今天小编就为大家分享一篇tensorflow更改变量的值实例,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧


  • 基于循环神经网络(RNN)的古诗生成详细写法代码

    发布:2020-07-17

    这篇文章主要为大家详细介绍了基于循环神经网络(RNN)的古诗生成器,具有一定的参考价值,感兴趣的小伙伴们可以参考一下


  • tensorflow基于Anaconda环境搭建的方法步骤

    发布:2023-03-23

    本文主要介绍了tensorflow基于Anaconda环境搭建的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学习学习吧


  • Kotlin作用域函数应用详细介绍

    发布:2023-03-02

    作用域函数:是Kotlin标准库中的内联函数,作用在对象上时,执行给定的block代码块。可以在block代码块中通过it,this代表当前对象,进行代码逻辑处理


  • Kotlin条件控制语句汇总讲解

    发布:2023-03-12

    条件控制是每门编程语言中必不可少的,一般就是使用我们所熟知的 ifelse ,来作为我们代码逻辑选择条件控制。 在 Java 中一般使用 ifelse 和 switch-case 来作为条件控制,而在 Kotlin 中则是使用 if-else 和 when 来作为条件控制


网友讨论