因为从broker也需要提供拉取消息的功能,所以需要把主broker的topic信息,订阅组信息,消息消费偏移量信息复制过来,当broker启动后会执行定时程序,一分钟同步一次
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask syncAll slave exception", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); public void syncAll() { this.syncTopicConfig(); this.syncConsumerOffset(); this.syncDelayOffset(); this.syncSubscriptionGroupConfig(); }同步topic配置信息,当两边的版本号不一致则替换为主broker的
private void syncTopicConfig() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { TopicConfigSerializeWrapper topicWrapper = this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak); if (!this.brokerController.getTopicConfigManager().getDataVersion() .equals(topicWrapper.getDataVersion())) { this.brokerController.getTopicConfigManager().getDataVersion() .assignNewOne(topicWrapper.getDataVersion()); this.brokerController.getTopicConfigManager().getTopicConfigTable().clear(); this.brokerController.getTopicConfigManager().getTopicConfigTable() .putAll(topicWrapper.getTopicConfigTable()); this.brokerController.getTopicConfigManager().persist(); log.info("Update slave topic config from master, {}", masterAddrBak); } } catch (Exception e) { log.error("SyncTopicConfig Exception, {}", masterAddrBak, e); } } }请求码为GET_ALL_TOPIC_CONFIG = 21
public TopicConfigSerializeWrapper getAllTopicConfig( final String addr) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_TOPIC_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(true, addr), request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return TopicConfigSerializeWrapper.decode(response.getBody(), TopicConfigSerializeWrapper.class); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }主broker打包信息返回
private RemotingCommand getAllTopicConfig(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(GetAllTopicConfigResponseHeader.class); // final GetAllTopicConfigResponseHeader responseHeader = // (GetAllTopicConfigResponseHeader) response.readCustomHeader(); String content = this.brokerController.getTopicConfigManager().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No topic in this broker, client: {}", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No topic in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }同步消费偏移量
private void syncConsumerOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { ConsumerOffsetSerializeWrapper offsetWrapper = this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak); this.brokerController.getConsumerOffsetManager().getOffsetTable() .putAll(offsetWrapper.getOffsetTable()); this.brokerController.getConsumerOffsetManager().persist(); log.info("Update slave consumer offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e); } } }请求码为GET_ALL_CONSUMER_OFFSET = 43
public ConsumerOffsetSerializeWrapper getAllConsumerOffset( final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_CONSUMER_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return ConsumerOffsetSerializeWrapper.decode(response.getBody(), ConsumerOffsetSerializeWrapper.class); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }打包返回存储并持久化
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getConsumerOffsetManager().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("get all consumer offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }同步延迟消息信息
private void syncDelayOffset() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { String delayOffset = this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak); if (delayOffset != null) { String fileName = StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController .getMessageStoreConfig().getStorePathRootDir()); try { MixAll.string2File(delayOffset, fileName); } catch (IOException e) { log.error("Persist file Exception, {}", fileName, e); } } log.info("Update slave delay offset from master, {}", masterAddrBak); } catch (Exception e) { log.error("SyncDelayOffset Exception, {}", masterAddrBak, e); } } }请求码为GET_ALL_DELAY_OFFSET = 45
public String getAllDelayOffset( final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException, UnsupportedEncodingException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_DELAY_OFFSET, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return new String(response.getBody(), MixAll.DEFAULT_CHARSET); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }打包返回直接持久化
private RemotingCommand getAllDelayOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); if (!(this.brokerController.getMessageStore() instanceof DefaultMessageStore)) { log.error("Delay offset not supported in this messagetore, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("Delay offset not supported in this messagetore"); return response; } String content = ((DefaultMessageStore) this.brokerController.getMessageStore()).getScheduleMessageService().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("Get all delay offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No delay offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No delay offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }同步订阅组配置信息
private void syncSubscriptionGroupConfig() { String masterAddrBak = this.masterAddr; if (masterAddrBak != null) { try { SubscriptionGroupWrapper subscriptionWrapper = this.brokerController.getBrokerOuterAPI() .getAllSubscriptionGroupConfig(masterAddrBak); if (!this.brokerController.getSubscriptionGroupManager().getDataVersion() .equals(subscriptionWrapper.getDataVersion())) { SubscriptionGroupManager subscriptionGroupManager = this.brokerController.getSubscriptionGroupManager(); subscriptionGroupManager.getDataVersion().assignNewOne( subscriptionWrapper.getDataVersion()); subscriptionGroupManager.getSubscriptionGroupTable().clear(); subscriptionGroupManager.getSubscriptionGroupTable().putAll( subscriptionWrapper.getSubscriptionGroupTable()); subscriptionGroupManager.persist(); log.info("Update slave Subscription Group from master, {}", masterAddrBak); } } catch (Exception e) { log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e); } } }请求码为GET_ALL_SUBSCRIPTIONGROUP_CONFIG = 201
public SubscriptionGroupWrapper getAllSubscriptionGroupConfig( final String addr) throws InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException, MQBrokerException { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ALL_SUBSCRIPTIONGROUP_CONFIG, null); RemotingCommand response = this.remotingClient.invokeSync(addr, request, 3000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { return SubscriptionGroupWrapper.decode(response.getBody(), SubscriptionGroupWrapper.class); } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }打包返回存储并持久化
private RemotingCommand getAllSubscriptionGroup(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); String content = this.brokerController.getSubscriptionGroupManager().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No subscription group in this broker, client:{} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No subscription group in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }