https://github.com/carnellj/spmia-chapter8
Spring Cloud Stream架构
随着Spring Cloud中消息的发布和消费,有4个组件涉及发布消息和消费消息:
发射器:当一个服务准备发布消息时,它将使用一个发射器发布消息。发射器是一个Spring注解接口,它接收一个普通的Java对象(POJO),该对象代表要发布的消息。发射器接收消息,然后序列化它(默认的序列化是JSON)并将消息发布到通道
通道:通道是对队列的一个抽象,它将在消息生产者发布消息或消息消费者消费消息后保留该消息。通道名称始终与目标队列名称相关联。队列名称永远不会直接公开给代码,通道名称会在代码中使用。这意味着开发人员可以通过更改应用程序的配置而不是应用程序的代码来切换通道读取或写入的队列
绑定器:它是与特定消息平台对话的Spring代码。允许开发人员处理消息,而不必依赖于特定于平台的库和API来发布和消费消息
接收器:服务通过一个接收器从队列中接收消息。接收器监听传入消息的通道,并将消息反序列为话POJO
消息处理流程
消息生产者
1、依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> @SpringBootApplication @EnableEurekaClient @EnableCurcuitBreakder // 告诉spring cloud stream将应用程序绑定到消息代理 // Source.class告诉Spring Cloud Stream,该服务将通过在Source类上定义的一组通道与消息代理进行通信 // 记住,通道位于消息队列之上 @EnableBinding(Source.class) public class Application { @Bean public Filter userContextFilter() { return new UserContextFilter(); } public static void main(String[] args) { SpringApplication.run(Application.class, args); } }2、配置
spring: application: name: organizationservice cloud: stream: bindings: output: #消息队列(或主题)的名称 destination: orgChangeTopic #向Spring Cloud Stream提供将要发送和接收什么类型的消息 content-type: application/json #告诉Spring将使用kafka作为服务中的消息总线 kafka: binder: #zkNodes和brokers属性告诉Spring Cloud Stream,Kafka和Zookeeper的网络位置 zkNodes: localhost brokers: localhost3、发布消息
@Component public class SimpleSourceBean { private Source source; private static final Logger = LoggerFactory.getLogger(SimpleSourceBean.class); // 注入Source接口 @Autowired public SimpleSourceBean(Source source) { this.source = source; } // 发布消息 public void publishOrgChange(String action, String orgId) { logger.debug("Sending kafka messge {} for Organization Id:{}", action, orgId); // 消息POJO OrganizationChangeModel change = new OrganizationChangeModel( OrganizationModel.class.getTypeName(), action, orgId, UserContext.getCorrelationId()); // 使用Source定义的通道发送消息 // output()方法返回一个MessageChannel类型的类。MessageChannel代表了如何将消息发送给消息代理 source.output().send(MessageBuilder.withPayload(change).build()); } } @Service public class OrganizationService { @Autowired private OrganizationRepository orgRepository; @Autowired SimpleSourceBean simpleSourceBean; public void saveOrg(Organization org) { org.setId(UUID.randomUUID().toString()); orgRepository.save(org); simpleSourceBean.publishOrgChange("SAVE", org.getId()); // 发送消息 } }消息消费者
1、依赖
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency> // 告诉服务使用Sink接口中定义的通道来监听传入的消息 @EnableBinding(Sink.class) public class Application { // 每次收到来自input通道的消息时,Spring Cloud Stream将执行此方法 // Spring Cloud Stream在Sink接口上公开了一个默认的通道,名为input,它用于监听通道上的传入消息 // @StreamListener注解告诉Spring Cloud Stream,每次从input通道接收消息,就会执行loggerSink()方法。 // Spring Cloud Stream将自动把从通道中传出的消息反序列化为一个名为OrganizationChangeModel的POJO @StreamListener(Sink.INPUT) public void loggerSink(OrganizationChangeModel orgChange) { logger.debug("Received an event for organization id {}", orgChange.getOrganizationId()); } }2、配置
spring: application: name: licensingservice cloud: stream: bindings: #spring.cloud.stream.bindings.input属性将input通道映射到orgChangeTopic队列 input: destination: orgChangeTopic content-type: application/json #该group属性只用于保证服务只处理一次 #group属性定义将要消息消息的消费者组的名称 group: licensingGroup kafka: binder: zkNodes: localhost brokers: localhost消费者组的概念:开发人员可能拥有多个服务,每个服务都有多个实例监听同一个消息队列,但是只需要服务实例组中的一个服务实例来消费和处理消息。group属性标识服务所属的消费者组。只要服务实例具有相同的组名,Spring Cloud Stream和底层消息代理将保证,只有消息的一个副本会被属于该组的服务实例所使用。
分布式缓存Redis
1、依赖
<dependency> <groupId>org.springframework.data</groupId> <artifactId>spring-data-redis</artifactId> <version>1.7.4.RELEASE</version> </dependency> <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> <version>2.9.0</version> </dependency> <!-- 用作redis数据库 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-pool2</artifactId> <version>2.0</version> </dependency>2、构造一个到Redis服务器的数据库连接
@SpringBootApplication @EnableEurekaClient @EnableCurcuitBreaker @EnableBinding(Sink.class) public class Application { @Autowired private ServiceConfig serviceConfig // jedisConnectionFactory设置到Redis服务器的实际数据库连接 @Bean public JedisConnectionFactory jedisConnectionFactory() { JedisConnectionFactory jedisConnFactory = new JedisConnectionFactory(); jedisConnFactory.setHostName(serviceConfig.getRedisServer()); jedisConnFactory.setPort(serviceConfig.getRedisPort()); return jedisConnFactory; } // RedisTemplate用于对Redis服务器执行操作 @Bean public RedisTemplate<String, Object> redisTemplate() { RedisTemplate<String, Object> template = new RedisTemplate<String, Object>(); template.setConnectionFactory(jedisConnectionFactory()); return template; } }3、定义Spring Data Redis存储库
Redis是一个键值数据存储,它的作用类似于一个大的、分布式的、内存中的HashMap。在最简单的情况下,它存储数据并按键查找数据。
public interfact OrganizationRedisRepository { void saveOrganization(Organization org); void updateOrganization(Organization org); void deleteOrganization(String organizationId); Organization findOrganization(String organizationId); } @Repository public class OrganizationRedisRepositorImpl implements OrganizationRedisRepository { private static final String HASH_NAME = "organization"; // 在Redis服务器中存储的组织数据的散列名称 private RedisTemplate<String, Organization> redisTemplate; private HashOperations hashOperations; // HashOperations类包含操作Redis服务器上执行数据操作的辅助方法 public OrganizationRedisRepositoryImpl() { super(); } @Autowired private OrganizationRedisRepositoryImpl(RedisTemplate redisTemplate) { this.redisTemplate = redisTemplate; } @PostConstruct private void init() { hashOperations = redisTemplate.opsForHash(); } @Override public void saveOrganization(Organization org) { hashOperations.put(HASH_NAME, org.getId(), org); } @Override public void updateOrganization(Organization org) { hashOperations.put(HASH_NAME, org.getId(), org); } @Override public void deleteOrganization(String organizationId) { hashOperations.delete(HASH_NAME, organizationId); } @Override public Organization findOrganization(String organizationId) { return (Organization) hashOperations.get(HASH_NAME, organizationId); } }4、使用Redis存储和读取数据
@Component public class OrganizationRestTemplateClient { @Autowired RestTemplate restTemplate; @Autowired OrganizationRedisRepository orgRedisRepo; private static final Logger logger = LoggerFactory.getLogger(OrganizationRestTemplateClient.class); private Organization checkRedisCache(String organizationId) { try { return orgRedisRepo.findOrganization(organizationId); } catch (Exception ex) { logger.error("Error encountered while trying to retrieve organization {} check Redis Cache.Exception {}", organizationId, ex); return null; } } private void cacheOrganizationObject(Organization org) { try { orgRedisRepo.saveOrganization(org); } catch (Exception ex) { logger.error("Unable to cache organization {} in Redis.Exception {}", org.getId(), ex); } } public Organization getOrganization(String organizationId) { logger.debug("In Licensing Service.getOrganization: {}", UserContext.getCorrelationId()); Organization org = checkRedisCache(organizationId); if (org != null) { logger.debug("I have successfully retrieved an organization {} from the redis cache: {}", organizationId, org); return org; } logger.debug("Unable to locate organization from teh redis cache:{}", organizationId); // 在与缓存进行交互时,要特别注意异常处理。为了提高弹性,如果无法与Redi服务器通信, // 我们绝对不会让整个调用失败。相反,我们会记录异常,并让调用转到组织服务 ResponseEntity<Organization> restExchange = restTemplate.exchange( "http://zuulservice/api/organization/v1/organizations/{organizationId}", HttpMethod.GET, null, Organization.class, organizatioId ); org = restExchange.getBody(); if (org != null) { cacheOrganizationObject(org); } return org; } }定义自定义通道
1、自定义input通道
public interface CustomChannles { // @Input是方法级别的注解,它定义了通道的名称 // 通过@Input注解公开的每个通道必须返回SubscribableChannel类 @Input("inboundOrgChanges") SubscribableChannel orgs(); } public interface CustomChannels { // 定义output通道,必须返回MessageChannel类 @Output("outboundOrg") MessageChannel outboundOrg(); } spring: cloud: stream: bindings: #使用自定义的input通道 inboundOrgChanges: destination: orgChangeTopic content-type: application/json group: licensingGroup // 缓存管理类 @EnableBinding(CustomChannels.class) // 使用自定义的通道class public class OrganizationChangeHandler { @Autowired private OrganizationRedisRepository organizationRedisRepository; @StreamListener("inboundOrgChanges") // 监听自定义的input通道 public void loggerSink(OrganizationChangeMode orgChange) { switch(orgChange.getAction()) { case "UPDATE": case "DELETE" organizationRedisRepository.deleteOrganization(orgChange.getOrganizationId(); break; } } }