RxSwift核心原理探究
2024-04-09 16:15:41  阅读数 2220

在之前有浅浅的分享了一下RxSwift简单使用,
但是同样的也有一些困惑伴随着我,比如它是如何实现,为什么所有的对象类都可以使用rx方法呢,再比如Timer实现方式为什么跟原生的又差别如此之大呢,带着这些个疑问,就想着看一下这强大的库是如何实现的,下面大概分享一下个人的拙见;

RxSwift本质上就是信号的产生、订阅、发送跟销毁,核心逻辑就是产生、订阅、发送三步曲:1、创建信号 2、订阅信号 3、发送信号,下面就以一个最简单信号创建订阅流程来分析一下,它内部是怎么实现的;

先创建Observable可观察者对象,然后使用subscribe订阅,最后第三步发送信号就是隐藏步骤,实际开发中,我们不需要去直接调用onNext、onError操作;

class ViewController: UIViewController {
    let disposeBag = DisposeBag()
    
    override func viewDidLoad() {
        super.viewDidLoad() 
        // 1、创建信号
        let ob = Observable<Any>.create { observer in
            // 3、发送信号
            observer.onNext("下一步")
//            observer.onError(NSError.init(domain: "Chris's error", code: 10086, userInfo: nil))
            observer.onCompleted()
            
            return Disposables.create()
        }
        
        // 2、订阅信号
        let _ = ob.subscribe { text in
            print("订阅到了:\(text)")
        } onError: { error in
            print("error:\(error)")
        } onCompleted: {
            print("完成")
        } onDisposed: {
            print("销毁")
        }
        .disposed(by: disposeBag) 
    }
}

一、创建信号

1、Observable<Any>.create

通过Observable<Any>.create创建一个可观察对象,传入一个尾随闭包作为参数,点进去create看一下其内部实现,发现create是ObservableType的一个扩展方法;


image.png

而ObservableType其实是一个协议,继承自ObservableConvertibleType,Observable遵循了ObservableType协议,即Observable调用ObservableType协议里的create方法;


image.png

2、AnonymousObservable.subscribeHandler

create方法内部就一句代码 AnonymousObservable(subscribe),AnonymousObservable这个字面意思,是个匿名的可观察者,把subscribe传给AnonymousObservable,这个subscribe是我们创建的尾随闭包,那么AnonymousObservable里面又做什么处理呢?只能继续往下翻了;

public static func create(_ subscribe: @escaping (AnyObserver<Element>) -> Disposable) -> Observable<Element> {
        AnonymousObservable(subscribe)
    }

点进去AnonymousObservable看,发现AnonymousObservable给回调闭包取了个别名,又定义了一个全局变量subscribeHandler,保存我们外面传进来的闭包;


image.png

这里我们看到AnonymousObservable是继承的Producer,而Producer点击发现,它其实是继承自Observable;


image.png

到这一步,其实我们创建订阅信号的步骤就已经完成了;可能有点绕,总结一下,本质的思想就是通过父类Observable创建的订阅信号闭包,交给子类AnonymousObservable去保存实现;

二、订阅信号

1、创建一个AnonymousObserver订阅者

上面我们已经说了,我们创建的ob对象,其实是AnonymousObserver对象,所以此处subscribe就是创建一个AnonymousObserver订阅者;


image.png

2、AnonymousObserver初始化的时候保存eventHandler

前面的disposable这些销毁对象的创建先不看,重点看return返回值,self.asObservable().subscribe(observer) 这个做为参数传给可销毁对象Disposables;
observer这个其实就是订阅者,它是AnonymousObserver对象,继承自ObserverBase,后面传入的参数是个事件回调闭包,AnonymousObserver就是将eventHandler事件保存下来;注意到里面还有个onCore方法,里面是调用eventHandler执行操作;


image.png

AnonymousObserver的父类ObserverBase,它里面有个on方法,里面就一个switch方法,Event是个枚举类,往下看就能看到我们熟悉next\error\completed,再往后就是调用onCore方法,这个调用的时机后面具体分析;


image.png
image.png

3、self.asObservable().subscribe(observer) 方法调用

接下来看,self上面也说了,是AnonymousObservable对象,asObservable也不用过多关心,其实就是类似OC里面的多态,强制性返回Observable Class,重点看subscribe方法调用,传入的observer参数,上面步骤2已经说过了;

image.png

4、producer.subscribe

subscribe方法点击jump发现好多地方都有该方法,如上文所说,ob本质是AnonymousObservable对象,他们的继承链关系AnonymousObservable->Producer->Observable;Observable又遵循ObservableType协议,上诉几个类都有实现subscribe方法,具体也不知道要执行哪一个方法,所以这个时候找起来就比较麻烦了,我这个比较懒,不想挨个去找了,此时最简单的方法其实就是查看调用堆栈了;


image.png

这里可以发现asObservable().subscribe是执行的父类Producer里面的subscribe方法,其实仔细查看源码也会发现,AnonymousObservable没有实现subscribe方法,而Observable里面只是调用rxAbstractMethod()构造方法而已,做一些错误处理,也没有具体实现,所以关键代码还是在Producer里;
分析Producer里面的方法,主要就是通过不同条件,执行不同代码,其实本质执行的都是执行下面三行代码;

    let disposer = SinkDisposer()
    let sinkAndSubscription = self.run(observer, cancel: disposer)
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

这个schedule就先不深究了,调用之后还是会执行action方法,就是刚才subscribe里面的闭包函数;


image.png

5、AnonymousObservable.run

重点还是看一下上面run方法;调用的AnonymousObservable对象本身的run方法,run里面又调用AnonymousObservableSink的run方法,这个AnonymousObservableSink又是啥子东西???只能接着往下看了


image.png

6、AnonymousObservableSink.run

AnonymousObservableSink是继承自Sink,Sink是什么先不深究,里面定义了一些方法实现,里面保存了observe、cancel对象;


image.png

7、parent.subscribeHandler(AnyObserver(self))

看当前的这个run方法,里面就一句代码,parent就是步骤5传入的self,即AnonymousObservable对象,即表示AnonymousObservable调用subscribeHandler方法;


image.png

这个时候就串起来了,前文创建信号的时候讲过创建Observable的订阅信号的时候,交给子类AnonymousObservable去保存,这个时候就是调用之前保存的闭包回调了,就会来到下面这一步;

image.png

三、发送信号

1、 AnonymousObserver.onNext执行

根据上面的一系列操作,我们已经可以执行create里面的回调了,这一步我们开头的时候也说了,实际开发中不需要我们手动去调用onNext、onError等方法,但是既然我们是探究他的原理,那就继续往下看;

这个observer是个什么东西?为什么能调用onNext等方法呢?

上面步骤7有提到,subscribeHandler.(AnyObserver(self)),我们可以得出observer == AnyObserver(self),那我们点开AnyObserver,发现它其实就是个结构体,遵循了ObserverType协议,用observer保存了AnonymousObservableSink的on方法,AnonymousObservableSink我们在订阅信号的步骤6有提到,截图中有被收纳起来的on方法;

所以create中的observer =AnyObserver(self),里面AnyObserver所持有的对象self.observer = AnonymousObservableSink.on;


image.png

2、ObserverType.onNext

到了这一步,我们还是不知道onNext怎么来的,既然它自身没有实现,那么只能去看他的协议方法了,果不其然,在协议扩展方法里面实现了onNext,往下看,其实他是调用当前的on方法,传入.next枚举外带value值;

image.png

3、AnonymousObservableSink.on

继续走会发现,调用了AnyObserver本身的on方法,实现就一行代码;

self.observer(event)

上面已经分析出了observer == AnonymousObservableSink.on,其实这里就是调用AnonymousObservableSink的on方法,继续往下看看sink.on里面做了什么操作;


image.png

下面是AnonymousObservableSink的on方法,这个方法很眼熟,跟前文中提到的ObserverBase的on方法很类似,只是ObserverBase的on最后是调用的onCore方法,这边调用的是forwardOn;


image.png

4、Sink.forwardOn

forwardOn当前类没有实现,只能去它的父类找,继续网上找,发现它是调用的self.observer.on方法,这个self.observer之前订阅信号的步骤6有提到过,sink保存observer跟cancel对象用来后续处理,唉,这边就用到了;


image.png

5、ObserverBase.on

这个self.observer打印发现它其实是AnonymousObserver类,继承至ObserverBase,所以到这一步,还是调用我们前文提到的ObserverBase的on方法,在往下执行onCore;


image.png

6、AnonymousObserver.onCore->self.eventHandler()

上一步骤的self其实是AnonymousObserver,那么onCore往下执行就到了我们订阅信号的步骤2提到的,执行AnonymousObserver保存下来eventHandler事件;


image.png

这个eventHandler事件,在订阅信号创建的时候,被我隐藏了,没有展开讲,这边展开来看一下,里面到底有什么秘密;

7、onNext、onError、onCompleted、dispose执行

image.png

点进去发现这个闭包就是响应event事件,对不同事件执行不同方法,onNext就是我们subscribe传入的闭包回调,value就是我们onNext传入的值;

    public func onNext(_ element: Element) {
        self.on(.next(element))
    }
image.png

到这里我们整个创建、订阅、发送信号整个过程就已经分析完了,前前后后执行了二十多个方法;里面的涉及了很多的继承、扩展、协议等等,可能有点绕;

四、小结:

总的流程有点绕,下面简单梳理一下:
首先是带able结尾的信号生产者继承链关系:


信号生产者继承链

然后是带observer结尾的信号订阅者继承链关系:


信号订阅者继承链

整体核心流程如下:


image.png

从图中也可以看出,Observable、Observer分工明确,sink是起到承上启下的作用,同时保存了observer、cancel,订阅信号的run方法,发送信号的on方法,都是从这边调用的,这个类就相当于一个业务中间层,所有业务逻辑都在这里处理,通过这个中间件可以串联Observable信号生产者跟Observer信号订阅者;

RxSwift使用了大量的继承、协议、扩展,实现了接口分离,模块分工清晰,里面设计虽然很复杂,但是暴露API都是很简单的,让开发者只想关心当前业务的开发,无需关心业务直接的调度,这种设计模式可以很好的实现业务分离。
我最近做的音视频开发的模块,视频的渲染跟解码就是通过这种方式使之分离,谁要渲染直接订阅当前暴露的接口就可以,调度者就根据订阅的信号去返回AVframe就可以,也无需关心是谁订阅的,同样的,音频帧处理也是这样。其实我们平时开发中也可以借鉴这种设计模式,分析框架最主要的就是学习其优秀的设计模式,为我所用;

以上,就是个人理解的RxSwift大体实现流程,如有不对,欢迎指正!!!

CSDN地址:https://blog.csdn.net/weixin_37498529/article/details/126560786
知乎地址:https://zhuanlan.zhihu.com/p/560495890