观察者模式实用性非常高,经常被用于UI–业务的解耦;当一个对象的状态成为一个敏感的逻辑要素时,往往就要出现观察者模式了。
观察者模式在开源框架中被广泛应用,知名度最高的应该就是RxJava系列,借用RxJava2常用的操作符的一段代码:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { String s = "I'm RxJava2"; observableEmitter.onNext(s); observableEmitter.onComplete(); } }).subscribe(new DisposableObserver<String>() { @Override protected void onStart() { System.out.println("----start----"); } @Override public void onNext(String s) { System.out.println("onNext: "+s); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("----end----"); } });这里先不多说,待了解整个观察者模式再回头分析这段代码。 另外UI中众多的监听器,也算是使用了观察者模式。
观察者模式具体的实体角色有两个,一个观察者,一个被观察者,很好理解,观察者需要能知道被观察者的状态,而这个消息通道就是接口;观察者模式也有另外的名号,比如发布-订阅(Publish/Subscribe)模式,这个名号能比较清晰地体现其依赖关系,即“订阅”关系,不订阅就算发布了也不会收到通知。
先是观察者的统一接口,主要是业务方法,根据需求定制:
public interface IObserver { void update(String content); }再则是被观察者的统一接口,包括建立订阅关系、解除订阅关系、发布消息通知状态变化的方法:
public interface IObservable { void attach(IObserver observer); void detach(IObserver observer); void submit(String content); }然后我们创建两个观察者A与B:
public class ObserverA implements IObserver { @Override public void update(String content) { System.out.println("A receive submit content: " + content); } } public class ObserverB implements IObserver { @Override public void update(String content) { System.out.println("B receive submit content: " + content); } }接着是具体的被观察者,需要注意就是被观察者需要一个容器放置已订阅的观察者,以方便发布消息时循环通知所有订阅的观察者:
public class ConcreteObservable implements IObservable { Vector<IObserver> vector = new Vector(); @Override public void attach(IObserver observer) { this.vector.addElement(observer); } @Override public void detach(IObserver observer) { this.vector.removeElement(observer); } @Override public void submit(String content) { Iterator var2 = this.vector.iterator(); while(var2.hasNext()) { IObserver observer = (IObserver)var2.next(); observer.update(content); } } }最后是测试类,也就是实际使用的地方,注意要使用被观察者的订阅方法与观察者建立关系:
public class Main { public static void main(String[] args) { ConcreteObservable observable = new ConcreteObservable(); observable.attach(new ObserverA()); observable.attach(new ObserverB()); observable.submit("~I'm working~"); } }不出意外,输出应为:
A receive submit content: ~I'm working~ B receive submit content: ~I'm working~当然,实际应用中可能只会出现接口,使用匿名的方式创建观察者并直接建立订阅关系:
public class Main { public static void main(String[] args) { ConcreteObservable observable = new ConcreteObservable(); observable.attach(new IObserver() { @Override public void update(String content) { System.out.println("I'm working"); } }); } }还可以更进一步,将被观察者也使用匿名方式创建出来,但使用抽象类会方便一些。
整个模式的示例代码用表表示出来是:
类说明IObservable被观察者特征抽象IObserver观察者特征抽象ConcreteObservable具体被观察者ObserverA/B具体观察者Main最终需要调用的地方用图表示是这样的:
观察者模式可能是性价比最高的设计模式之一,利用很少的代码就可以实现强大的解耦,也正因为其重要性,广大开源框架频繁使用,JDK本身也带有,很早就有:
/** * * @author Chris Warth * @see java.util.Observable#notifyObservers() * @see java.util.Observable#notifyObservers(java.lang.Object) * @see java.util.Observer * @see java.util.Observer#update(java.util.Observable, java.lang.Object) * @since JDK1.0 */ public class Observable { private boolean changed = false; private Vector<Observer> obs; public Observable() { obs = new Vector<>(); } public synchronized void addObserver(Observer o) { if (o == null) throw new NullPointerException(); if (!obs.contains(o)) { obs.addElement(o); } } public synchronized void deleteObserver(Observer o) { obs.removeElement(o); } //省略部分代码然后再回头看RxJava2的创建一个观察者模式:
Observable.create(new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { String s = "I'm RxJava2"; observableEmitter.onNext(s); observableEmitter.onComplete(); } }).subscribe(new DisposableObserver<String>() { @Override protected void onStart() { System.out.println("----start----"); } @Override public void onNext(String s) { System.out.println("onNext: "+s); } @Override public void onError(Throwable throwable) { throwable.printStackTrace(); } @Override public void onComplete() { System.out.println("----end----"); } });其中,ObservableEmitter充当着实际发布者也就是被观察者IObservable的角色,当然Emmiter是被封装到Observable中了:
public interface ObservableEmitter<T> extends Emitter<T> { void setDisposable(@Nullable Disposable var1); void setCancellable(@Nullable Cancellable var1); boolean isDisposed(); @NonNull ObservableEmitter<T> serialize(); boolean tryOnError(@NonNull Throwable var1); }再往上看一层,看到其最顶层接口:
public interface Emitter<T> { void onNext(@NonNull T var1); void onError(@NonNull Throwable var1); void onComplete(); }这里的onNext就相当于之前代码示例IObservable中的submit方法,是发布通知的方法,外面的subscribe方法自然就相当于attach方法也就是订阅方法,DisposableObserver是观察者ObserverA或ObserverB:
public abstract class DisposableObserver<T> implements Observer<T>, Disposable { final AtomicReference<Disposable> upstream = new AtomicReference(); public DisposableObserver() { } public final void onSubscribe(@NonNull Disposable d) { if (EndConsumerHelper.setOnce(this.upstream, d, this.getClass())) { this.onStart(); } } protected void onStart() { } public final boolean isDisposed() { return this.upstream.get() == DisposableHelper.DISPOSED; } public final void dispose() { DisposableHelper.dispose(this.upstream); } }再往上看一层,看到其最顶层接口:
public interface Observer<T> { void onSubscribe(@NonNull Disposable var1); void onNext(@NonNull T var1); void onError(@NonNull Throwable var1); void onComplete(); }至于attach也就是对应的subscribe在Observable类中,Observable类又实现了ObservableSource接口:
public interface ObservableSource<T> { void subscribe(@NonNull Observer<? super T> var1); }简单来说,Emitter接口与Observer接口就是被观察者与观察者各自抽象的接口了,区别在Emitter又被封装了一层,嵌进了Observable类中,Emitter的onNext方法就是发布通知消息,而Observer的onNext方法就是接收消息。 具体的逻辑在类似于CreateEmitter的类中,它实现了ObservableEmitter接口,并在onNext中处理了发布的逻辑:
//ObservableCreate.java static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { private static final long serialVersionUID = -3434801548987643227L; final Observer<? super T> observer; CreateEmitter(Observer<? super T> observer) { this.observer = observer; } public void onNext(T t) { if (t == null) { this.onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); } else { if (!this.isDisposed()) { this.observer.onNext(t); } } } public void onError(Throwable t) { if (!this.tryOnError(t)) { RxJavaPlugins.onError(t); } } public void onComplete() { if (!this.isDisposed()) { try { this.observer.onComplete(); } finally { this.dispose(); } } } //省略部分 }以上。
