当我们开发软件时,各个对象之间的数据共享和合作是必须的。 但是这里比较难做的是 怎样保证消息之间的传输高效并且减少各个模块之间的耦合。 当组件的职责不清楚时,一个组件还要承担另一个组件的职责,这样的系统我们就认为是高耦合。 当我们的系统变得高耦合时,任何一个小的改动都会对系统造成影响。 为了解决设计上的问题,我们设计了基于事件的设计模型。 在事件驱动编程模型中,对象可以发布/订阅 事件. 事件监听者就是监听事件的发生,我们在第六章中已经看到过RemovalListener, 在这一章中,我们将讨论Guava的EventBus类,了解它的发布/订阅事件是怎么使用的。
这一章,我们将覆盖下面的知识点:-- EventBus 和 AsyncEventBus类-- 怎样是用EventBus订阅事件-- 使用EventBus发布事件-- 编写事件处理器,并且根据我们的需求选择合适的处理器-- 与DI工具协作
EventBus类是guava中关注消息的发布和订阅的类,简单的说订阅者通过EventBus注册并订阅事件,发布者将事件发送到EventBus中,EventBus将事件顺序的通知给时间订阅者,所以 这里面有一个重要的注意点,事件处理器必须迅速的处理,否则可能会导致时间堆积。
创建一个EventBus实例,只需要简单的调用构造方法:
EventBus eventBus = new EventBus();也提供了一个带参数的构造类,目的只是为了加上一个标识:
EventBus eventBus = new EventBus(TradeAccountEvent.class.getName());为了接受到一个事件,我们需要做下面3个步骤:
这个类需要定义一个只接受一个参数的public方法, 参数的类型要和订阅的事件类型一只。需要在方法上加上@Subscribe注解最后我们调用EventBus的register方法注册对象我们可以调用EventBus.post方法发送事件,EventBus会轮流调用所有的接受类型是发送事件类型的订阅者,但是这里面有一个比较强大的东西,就是。。。。。。。。。。。
如前面提到了事件处理方法只能接受一个参数,EventBus会轮流顺序调用订阅的方法,因此事件处理方法必须很快的给予响应,如果说时间处理的方法中有需要进行长时间运算的过程,我们建议另起一个线程处理。
EventBus不会起多个线程去调用时间处理方法,除非我们在事件的处理方法上加上注解@AllowCOncurrentEvent,加上这个注解后我们就会认为这个事件处理方法是线程安全的.Annotating a handler method with the @AllowConcurrentEvent annotation by itself will not register a method with EventBus
现在我们来看看怎么使用EventBus,我们来看一些例子.
我们假设我们已经像如下的方式定义了一个事件:
public class TradeAccountEvent { private double amount; private Date tradeExecutionTime; private TradeType tradeType; private TradeAccount tradeAccount; public TradeAccountEvent(TradeAccount account, double amount, Date tradeExecutionTime, TradeType tradeType) { checkArgument(amount > 0.0, "Trade can't be less than zero"); this.amount = amount; this.tradeExecutionTime = checkNotNull(tradeExecutionTime,"ExecutionTime can't be null"); this.tradeAccount = checkNotNull(account,"Account can't be null"); this.tradeType = checkNotNull(tradeType,"TradeType can't be null"); }由上面可以看出,无论是买或者卖的事件发生,我们都会创建一个TradeAccountEvent对象,现在让我们考虑一下我们当这个事件被执行时我们希望监听者能够接收到,我们定义SimpleTradeAuditor类:
public class SimpleTradeAuditor { private List<TradeAccountEvent> tradeEvents = Lists.newArrayList(); public SimpleTradeAuditor(EventBus eventBus){ eventBus.register(this); } @Subscribe public void auditTrade(TradeAccountEvent tradeAccountEvent){ tradeEvents.add(tradeAccountEvent); System.out.println("Received trade "+tradeAccountEvent); } }我们可以快速的看一下代码,在构造方法中,我们接受一个EventBus实例,接着我们注册SimpleTradeAuditor类到EventBus中,接受事件TradeAccountEvents. 通过指定@Subscribe注解说明哪个方法是事件处理器. 上面例子中的处理方式:将event加入到list中,并且在控制台中打印出来.
现在我们看一下怎样发布一个事件,看下面的类:
public class SimpleTradeExecutor { private EventBus eventBus; public SimpleTradeExecutor(EventBus eventBus) { this.eventBus = eventBus; } public void executeTrade(TradeAccount tradeAccount, double amount, TradeType tradeType){ TradeAccountEvent tradeAccountEvent = processTrade(tradeAccount, amount, tradeType); eventBus.post(tradeAccountEvent); } private TradeAccountEvent processTrade(TradeAccount tradeAccount, double amount, TradeType tradeType){ Date executionTime = new Date(); String message = String.format("Processed trade for %s of amount %n type %s @ %s",tradeAccount,amount,tradeType,executionTime); TradeAccountEvent tradeAccountEvent = new TradeAccountEvent(tr adeAccount,amount,executionTime,tradeType); System.out.println(message); return tradeAccountEvent; } }像上面的SimpleTradeAuditor类一样,在SimpleTradeExecutor的构造方法中我们也接受一个EventBus作为构造参数. 和 SimpleTradeAuditor类,为了方便后面使用,我们使用了一个成员变量引用了eventbus类,尽管大多数情况下,在两个类中使用同一个eventBus实例是不好的,我们将在后面的例子中去看怎样使用多个EventBus实例。 但是在这个例子中,我们使用同一个实例. SimpleTradeExecutor类有一个公开的方法,executeTrade接受我们处理一个trade的所有信息。 在这个例子中我们调用processTrade方法,传入了必要的信心并且在控制台中打印了交易已经被执行,并且返回一个TradeAccountEvent实例。 当processTrade 方法执行完,我们调用EventBus的post方法将TradeAccountEvent作为参数, 这样所有订阅TradeAccountEvent事件的订阅者都会收到这个消息。 这样我们就可以看到,publish 类和 scribe类通过消息解耦了
我们刚才了解了怎样使用EventBus订阅发布事件。 我们知道 EventBus事件的发布与订阅是基于事件类型的, 这样我们就可以通过事件类型将事件发送给不同的订阅者。 比如: 如果我们我们想分别订阅 买和卖事件。 首先我们创建两种类型的事件:
public class SellEvent extends TradeAccountEvent { public SellEvent(TradeAccount tradeAccount, double amount, Date tradExecutionTime) { super(tradeAccount, amount, tradExecutionTime, TradeType. SELL); } } public class BuyEvent extends TradeAccountEvent { public BuyEvent(TradeAccount tradeAccount, double amount, Date tradExecutionTime) { super(tradeAccount, amount, tradExecutionTime, TradeType.BUY); } }现在我们创建了两种不同类型的事件实例,SellEvent和BuyEvent,两个事件都继承了TradeAccountEvent。 我们能够实现分别的订阅,我们先创建一个能够订阅SellEvent的实例:
public class TradeSellAuditor { private List<SellEvent> sellEvents = Lists.newArrayList(); public TradeSellAuditor(EventBus eventBus) { eventBus.register(this); } @Subscribe public void auditSell(SellEvent sellEvent){ sellEvents.add(sellEvent); System.out.println("Received SellEvent "+sellEvent); } public List<SellEvent> getSellEvents() { return sellEvents; } }从功能点上来看,这个实例和我们之前的SimpleTradeAuditor差不多,只不过上面的这个实例只接受SellEvent事件,下面我们再创建一个只接受BuyEvent事件的实例:
public class TradeBuyAuditor { private List<BuyEvent> buyEvents = Lists.newArrayList(); public TradeBuyAuditor(EventBus eventBus) { eventBus.register(this); } @Subscribe public void auditBuy(BuyEvent buyEvent){ buyEvents.add(buyEvent); System.out.println("Received TradeBuyEvent "+buyEvent); } public List<BuyEvent> getBuyEvents() { return buyEvents; } }现在我们只需要重构我们的SimpleTradeExecutor类去基于buy或者sell创建正确的TradeAccountEvent。
public class BuySellTradeExecutor { … deatails left out for clarity same as SimpleTradeExecutor //The executeTrade() method is unchanged from SimpleTradeExecutor private TradeAccountEvent processTrade(TradeAccount tradeAccount, double amount, TradeType tradeType) { Date executionTime = new Date(); String message = String.format("Processed trade for %s of amount %n type %s @ %s", tradeAccount, amount, tradeType, executionTime); TradeAccountEvent tradeAccountEvent; if (tradeType.equals(TradeType.BUY)) { tradeAccountEvent = new BuyEvent(tradeAccount, amount, executionTime); } else { tradeAccountEvent = new SellEvent(tradeAccount, amount, executionTime); } System.out.println(message); return tradeAccountEvent; } }这里我们创建了和SimpleTradeExecutor功能相似的类:BuySellTradeExecutor,只不过BuySellTradeExecutor根据交易类型创建了不同的事件,BuyEvent和SellEvent。 我们发布了不同的事件,注册了不通的订阅者,但是EventBus对这样的改变没有感知。 为了接受这两个事件,我们不需要创建两个类,我们只需要像如下这种方式就可以:
public class AllTradesAuditor { private List<BuyEvent> buyEvents = Lists.newArrayList(); private List<SellEvent> sellEvents = Lists.newArrayList(); public AllTradesAuditor(EventBus eventBus) { eventBus.register(this); } @Subscribe public void auditSell(SellEvent sellEvent){ sellEvents.add(sellEvent); System.out.println("Received TradeSellEvent "+sellEvent); } @Subscribe public void auditBuy(BuyEvent buyEvent){ buyEvents.add(buyEvent); System.out.println("Received TradeBuyEvent "+buyEvent); } }上面我们创建一个实例,有两个事件处理方法,这样AllTradeAuditor会接受所有的Trade事件。 哪个方法被调用取决于EventBus发送什么类型的事件。 为了验证一下,我们可以写一个方法接受Object类型的参数,这样就可以收到所有的事件。下面我们来考虑一下我们有多个EventBus实例。 如果我们把BuySellTradeExecutor类拆分成两个类,这样我们就可以注入两个不同的EventBus实例,但是在订阅类中就要注意注入的是哪个类了。 关于这个例子我们在这里不讨论了,代码可以见
bbejeck.guava.chapter7.config 包。
我们订阅事件的时候,肯定也会想到在某个时间点我们需要取消订阅某个事件。 取消事件订阅只需要调用eventBus.unregister方法。 如果我们知道我们在某个时刻想要停止对某个事件的处理,我们可以按照如下的方式处理:
public void unregister(){ this.eventBus.unregister(this); }一旦上面的方法被调用,就不在会收到任何事件,其他的没有取消订阅的会继续收到事件.
我们之前一直强调事件处理器的处理逻辑要简单。 因为EventBus是顺序的处理每一个事件的。 我们还有另外一种处理方式: AsyncEventBus. AsyncEventBus提供了和EventBus相同的功能。只是在处理事件的时候采用了Java.util.concurrent.Executor 调用事件处理器。
创建AsyncEvent和创建一个EventBus差不多:
AsyncEventBus asyncEventBus = new AsyncEventBus(executorService);我们创建一个传入ExecutorService实例的AsyncEvent实例。 我们还有一个接受两个参数的构造函数,接受另外一个string表明ExecutorService的身份。 AysncEventBus在事件处理器需要花费时间比较长的场景下比较适合。
当一个事件没有监听者,我们就会将这样的事件包装成DeadEvent,这样有一个方法监听DeadEvent我们就可以知道哪些事件没有监听者。
public class DeadEventSubscriber { private static final Logger logger = Logger.getLogger(DeadEventSubscriber.class); public DeadEventSubscriber(EventBus eventBus) { eventBus.register(this); } @Subscribe public void handleUnsubscribedEvent(DeadEvent deadEvent){ logger.warn("No subscribers for "+deadEvent.getEvent()); } }上面的例子中我们简单的注册了一个监听DeadEvent的监听者,简单的记录了没有被监听的事件。
为了确保我们注册的监听者和发布者是同一个EventBus实例,我们使用Spring来实现EventBus的注入。 在下面的例子中,我们将展示怎样使用Spring框架去配置SimpleTradeAuditor 和 SimpleTradeExecutor类。首先我们需要对SimpleTradeAuditor和SimpleTradeExecutor类做如下的改变:
@Component public class SimpleTradeExecutor { private EventBus eventBus; @Autowired public SimpleTradeExecutor(EventBus eventBus) { this.eventBus = checkNotNull(eventBus, "EventBus can't be null"); } @Component public class SimpleTradeAuditor { private List<TradeAccountEvent> tradeEvents = Lists.newArrayList(); @Autowired public SimpleTradeAuditor(EventBus eventBus){ checkNotNull(eventBus,"EventBus can't be null"); eventBus.register(this); }我们首先在两个类上加上了@Component注解。 这样可以让Spring把这两个类当作可以注入的bean。这里我们使用的构造方法注入。所以我们加上了@Autowired注解。 加上了@Autowired注解spring就可以帮助我们注入EventBus类。
@Configuration @ComponentScan(basePackages = {"bbejeck.guava.chapter7.publisher", "bbejeck.guava.chapter7.subscriber"}) public class EventBusConfig { @Bean public EventBus eventBus() { return new EventBus(); } }这里我们的类上加上了@Configuration注解,这样spring就把这个类当作Context,在这个类中我们返回了EventBus实例。这样spring就对上面的两个类注入了同一个EventBus,这正是我们想要的,至于spring是怎么做到的,不在本书考虑的范围。
在这一章,我们讲解了怎样使用Guava进行事件驱动编程,来降低模块之间的耦合,我们讲解了怎么创建EventBus实例,并且怎样注册监听者和发布者。 并且我们也剖析了怎样更具事件类型去监听事件。 最后我们还学习了使用AsyncEventBus类,可以让我们异步的发送事件。我们也学习了怎样使用DeadEvent类去确保我们监听了所有的事件。 最后,我们还学习了使用依赖注入来使得我们可以更容易创建基于事件的系统。
