RxJava大家已经很熟悉了, 百度上关于RxJava的技术文章大把, 我为了锻炼自己阅读源码的习惯, 现在开始阅读第三方库的时候做个记录, 假装自己看过开源库的源码.
我们就从RxJava的基本使用来开始分析吧, 常用的使用方式如下所示:
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> emitter) throws Exception { emitter.onNext(getResources().getDrawable(R.drawable.ic_launcher_background)); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) { Log.d("@@@", "onSubscribe"); } @Override public void onNext(Drawable drawable) { Log.d("@@@", "onNext"); } @Override public void onError(Throwable e) { Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_LONG).show(); } @Override public void onComplete() { Log.d("@@@", "onComplete"); } });我们先按照代码出现的顺序分析调用过程,
1. 我们看下订阅方法 .subscribe(new Observer<Drawable>()); 点击该方法后可以看到进入了基类Observable
public final void subscribe(Observer<? super T> observer) { subscribeActual(observer); }内部的方法subscribeActual()是个抽象方法, 该方法的入参便是匿名的观察者 Observer 对象.
protected abstract void subscribeActual(Observer<? super T> observer);实际就是调用当前对象 ObservableCreate 的实现,
2. ObservableCreate 便是Observable.create(xxx) 返回值, ObservableCreate的代码如下
public final class ObservableCreate<T> extends Observable<T> { final ObservableOnSubscribe<T> source; public ObservableCreate(ObservableOnSubscribe<T> source) { this.source = source; } @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer); observer.onSubscribe(parent); source.subscribe(parent); } static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { observer.onNext(t); } @Override public void onComplete() { observer.onComplete(); } }该方法中观察者对象 Observer 作为了 CreateEmitter 类的构造方法的入参. 接下来调用了
source.subscribe(parent);
这里的source对象便是 Observable.create() 方法的入参 匿名对象 ObservableOnSubscribe, 即
new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> emitter) throws Exception { emitter.onNext(getResources().getDrawable(R.drawable.ic_launcher_background)); } }parent入参赋值给 ObservableEmitter<Drawable> emitter. 即执行 ObservableCreate 的内部类 CreateEmitter onNext() 方法.
大概的基本调用方法分析完毕.
