RxJava2.x的源码分析----线程切换

    xiaoxiao2023-10-12  173

    RxJava大家已经很熟悉了, 百度上关于RxJava的技术文章大把, 为了锻炼自己阅读源码的习惯, 现在阅读第三方库的时候做个记录, 假装自己看过开源库的源码.

    今天我们就分析RxJava的线程切换源码, 没有看过RxJava的基本调用过程的请移步RxJava的基本流程, 分析的demo代码如下:

    Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> emitter) throws Exception { emitter.onNext(getResources().getDrawable(R.drawable.ic_launcher_background)); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) { Log.d("@@@", "onSubscribe"); } @Override public void onNext(Drawable drawable) { Log.d("@@@", "onNext"); } @Override public void onError(Throwable e) { Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_LONG).show(); } @Override public void onComplete() { Log.d("@@@", "onComplete"); } });

    Observable.create()方法的返回值 ObservableCreate 可以参考 RxJava的基本流程

    我们先分析下 ObservableCreate.subscribeOn(Schedulers.io()) 的具体实现

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } }

    ObservableSubscribeOn 的构造方法的第一个入参便是前面 Observable.create() 返回的对象, 第二个参数便是 Scheduler 对象, 在subscribeActual()方法中, 将Observer<? super T> s 传递到SubscribeOnObserver类中, 这里事件源在Schedulers.io()线程执行的过程具体看一下:

    scheduler.scheduleDirect(new SubscribeTask(parent)) 这里scheduler的值是 Schedulers.io() 即: IoScheduler scheduler.scheduleDirect() 具体实现如下: public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }

    可以看到new SubscribeTask(parent) 进行了多次包装,最终执行在 w.schedule(task, delay, unit);中

    这里的Work的类型是 IoScheduler, createWorker() 的返回值是

    new EventLoopWorker(pool.get());

    w.schedule(task, delay, unit);就是执行 EventLoopWorker.schedule(task, delay, unit); 最终在NewThreadWorker中执行

    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }

    可以看到最终是将new SubscribeTask(parent) 这个任务放在了线程池中执行.

    final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }

    这里source对象是 ObservableCreate 类的实例, parent是ObservableSubscribeOn的内部类SubscribeOnObserver的实例. 

     

    下面来看下切换到主线程的逻辑

    ObservableSubscribeOn.observeOn(AndroidSchedulers.mainThread()) 的源码 AndroidSchedulers.mainThread() 返回的是内部持有Handler的HandlerScheduler对象,其中Handler的构造方法的入参为Looper.getMainLooper(), 就可以将消息加入到主线程队列了.

    public final class AndroidSchedulers {

        private static final class MainHolder {

            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));     }

        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(             new Callable<Scheduler>() {                 @Override public Scheduler call() throws Exception {                     return MainHolder.DEFAULT;                 }             });

        /** A {@link Scheduler} which executes actions on the Android main thread. */     public static Scheduler mainThread() {         return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);     }

    }

    该observeOn()方法的内部返回了new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)对象,

    ObservableObserveOn 类的实现为:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {     final Scheduler scheduler;

        @Override     protected void subscribeActual(Observer<? super T> observer) {             Scheduler.Worker w = scheduler.createWorker(); // HandlerScheduler.HandlerWorker

                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));     }

        static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>     implements Observer<T>, Runnable {

            @Override         public void onSubscribe(Disposable s) {             if (DisposableHelper.validate(this.s, s)) {                 this.s = s;                 if (s instanceof QueueDisposable) {                     @SuppressWarnings("unchecked")                     QueueDisposable<T> qd = (QueueDisposable<T>) s;                     int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);                     if (m == QueueDisposable.SYNC) {                         queue = qd;                         actual.onSubscribe(this);                         schedule();                         return;                     }                     if (m == QueueDisposable.ASYNC) {                         queue = qd;                         actual.onSubscribe(this);                         return;                     }                 }

                    queue = new SpscLinkedArrayQueue<T>(bufferSize);

                    actual.onSubscribe(this);             }         }

            @Override         public void onNext(T t) {             if (sourceMode != QueueDisposable.ASYNC) {                 queue.offer(t);             }             schedule();         }

            @Override         public void onError(Throwable t) {             schedule();         }

            void schedule() {             if (getAndIncrement() == 0) {                 worker.schedule(this);             }         }

            void drainNormal() {             int missed = 1;

                final SimpleQueue<T> q = queue;             final Observer<? super T> a = actual;

                for (;;) {                 if (checkTerminated(done, q.isEmpty(), a)) {                     return;                 }

                    for (;;) {                     T v;                     try {                         v = q.poll();                     } catch (Throwable ex) {                         return;                     }                     a.onNext(v);                 }

                    missed = addAndGet(-missed);                 if (missed == 0) {                     break;                 }             }         }

            void drainFused() {             for (;;) {                 actual.onNext(null);                 if (d) {                     ex = error;                     if (ex != null) {                         actual.onError(ex);                     } else {                         actual.onComplete();                     return;                 }             }         }

            @Override         public void run() {             drainNormal();         }

            @Override         public T poll() throws Exception {             return queue.poll();         }     } }

    这里source的值是执行了.subscribeOn()的返回值 ObservableSubscribeOn 

    最后来看下ObservableObserveOn.subscribe()的实现

    @Override public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { throw npe; } }

    最后附上流程图:

      

     

     

     

    最新回复(0)