RxJava大家已经很熟悉了, 百度上关于RxJava的技术文章大把, 为了锻炼自己阅读源码的习惯, 现在阅读第三方库的时候做个记录, 假装自己看过开源库的源码.
今天我们就分析RxJava的线程切换源码, 没有看过RxJava的基本调用过程的请移步RxJava的基本流程, 分析的demo代码如下:
Observable.create(new ObservableOnSubscribe<Drawable>() { @Override public void subscribe(ObservableEmitter<Drawable> emitter) throws Exception { emitter.onNext(getResources().getDrawable(R.drawable.ic_launcher_background)); } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(new Observer<Drawable>() { @Override public void onSubscribe(Disposable d) { Log.d("@@@", "onSubscribe"); } @Override public void onNext(Drawable drawable) { Log.d("@@@", "onNext"); } @Override public void onError(Throwable e) { Toast.makeText(MainActivity.this,e.getMessage(),Toast.LENGTH_LONG).show(); } @Override public void onComplete() { Log.d("@@@", "onComplete"); } });Observable.create()方法的返回值 ObservableCreate 可以参考 RxJava的基本流程
我们先分析下 ObservableCreate.subscribeOn(Schedulers.io()) 的具体实现
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } @Override public void subscribeActual(final Observer<? super T> s) { final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); s.onSubscribe(parent); parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); } static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable { private static final long serialVersionUID = 8094547886072529208L; final Observer<? super T> actual; final AtomicReference<Disposable> s; SubscribeOnObserver(Observer<? super T> actual) { this.actual = actual; this.s = new AtomicReference<Disposable>(); } @Override public void onSubscribe(Disposable s) { DisposableHelper.setOnce(this.s, s); } @Override public void onNext(T t) { actual.onNext(t); } }ObservableSubscribeOn 的构造方法的第一个入参便是前面 Observable.create() 返回的对象, 第二个参数便是 Scheduler 对象, 在subscribeActual()方法中, 将Observer<? super T> s 传递到SubscribeOnObserver类中, 这里事件源在Schedulers.io()线程执行的过程具体看一下:
scheduler.scheduleDirect(new SubscribeTask(parent)) 这里scheduler的值是 Schedulers.io() 即: IoScheduler scheduler.scheduleDirect() 具体实现如下: public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { final Worker w = createWorker(); final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); w.schedule(task, delay, unit); return task; }可以看到new SubscribeTask(parent) 进行了多次包装,最终执行在 w.schedule(task, delay, unit);中
这里的Work的类型是 IoScheduler, createWorker() 的返回值是
new EventLoopWorker(pool.get());w.schedule(task, delay, unit);就是执行 EventLoopWorker.schedule(task, delay, unit); 最终在NewThreadWorker中执行
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); Future<?> f; try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }可以看到最终是将new SubscribeTask(parent) 这个任务放在了线程池中执行.
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }这里source对象是 ObservableCreate 类的实例, parent是ObservableSubscribeOn的内部类SubscribeOnObserver的实例.
下面来看下切换到主线程的逻辑
ObservableSubscribeOn.observeOn(AndroidSchedulers.mainThread()) 的源码 AndroidSchedulers.mainThread() 返回的是内部持有Handler的HandlerScheduler对象,其中Handler的构造方法的入参为Looper.getMainLooper(), 就可以将消息加入到主线程队列了.
public final class AndroidSchedulers {
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); }
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } });
/** A {@link Scheduler} which executes actions on the Android main thread. */ public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); }
}
该observeOn()方法的内部返回了new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)对象,
ObservableObserveOn 类的实现为:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler;
@Override protected void subscribeActual(Observer<? super T> observer) { Scheduler.Worker w = scheduler.createWorker(); // HandlerScheduler.HandlerWorker
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); }
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
@Override public void onSubscribe(Disposable s) { if (DisposableHelper.validate(this.s, s)) { this.s = s; if (s instanceof QueueDisposable) { @SuppressWarnings("unchecked") QueueDisposable<T> qd = (QueueDisposable<T>) s; int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY); if (m == QueueDisposable.SYNC) { queue = qd; actual.onSubscribe(this); schedule(); return; } if (m == QueueDisposable.ASYNC) { queue = qd; actual.onSubscribe(this); return; } }
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this); } }
@Override public void onNext(T t) { if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); }
@Override public void onError(Throwable t) { schedule(); }
void schedule() { if (getAndIncrement() == 0) { worker.schedule(this); } }
void drainNormal() { int missed = 1;
final SimpleQueue<T> q = queue; final Observer<? super T> a = actual;
for (;;) { if (checkTerminated(done, q.isEmpty(), a)) { return; }
for (;;) { T v; try { v = q.poll(); } catch (Throwable ex) { return; } a.onNext(v); }
missed = addAndGet(-missed); if (missed == 0) { break; } } }
void drainFused() { for (;;) { actual.onNext(null); if (d) { ex = error; if (ex != null) { actual.onError(ex); } else { actual.onComplete(); return; } } }
@Override public void run() { drainNormal(); }
@Override public T poll() throws Exception { return queue.poll(); } } }
这里source的值是执行了.subscribeOn()的返回值 ObservableSubscribeOn
最后来看下ObservableObserveOn.subscribe()的实现
@Override public final void subscribe(Observer<? super T> observer) { try { observer = RxJavaPlugins.onSubscribe(this, observer); subscribeActual(observer); } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { throw npe; } }最后附上流程图:
