当生产者发送消息后没有结束事务消息或者请求状态为未知时,就需要依靠broker的事务消息检查线程来回查消息状态。
private long transactionCheckInterval = 60 * 1000; private long transactionTimeOut = 6 * 1000; private int transactionCheckMax = 15; protected void onWaitEnd() { long timeout = brokerController.getBrokerConfig().getTransactionTimeOut(); int checkMax = brokerController.getBrokerConfig().getTransactionCheckMax(); long begin = System.currentTimeMillis(); log.info("Begin to check prepare message, begin time:{}", begin); this.brokerController.getTransactionalMessageService().check(timeout, checkMax, this.brokerController.getTransactionalMessageCheckListener()); log.info("End to check prepare message, consumed time:{}", System.currentTimeMillis() - begin); }获取准备topic的消息队列
String topic = MixAll.RMQ_SYS_TRANS_HALF_TOPIC; public Set<MessageQueue> fetchMessageQueues(String topic) { Set<MessageQueue> mqSet = new HashSet<>(); TopicConfig topicConfig = selectTopicConfig(topic); if (topicConfig != null && topicConfig.getReadQueueNums() > 0) { for (int i = 0; i < topicConfig.getReadQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setTopic(topic); mq.setBrokerName(brokerController.getBrokerConfig().getBrokerName()); mq.setQueueId(i); mqSet.add(mq); } } return mqSet; }没有时需要创建,默认只有一个队列
private TopicConfig selectTopicConfig(String topic) { TopicConfig topicConfig = brokerController.getTopicConfigManager().selectTopicConfig(topic); if (topicConfig == null) { topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod( topic, 1, PermName.PERM_WRITE | PermName.PERM_READ, 0); } return topicConfig; }针对每个队列进行检查,获取准备队列对应的操作队列,没有时创建
private MessageQueue getOpQueue(MessageQueue messageQueue) { MessageQueue opQueue = opQueueMap.get(messageQueue); if (opQueue == null) { opQueue = new MessageQueue(TransactionalMessageUtil.buildOpTopic(), messageQueue.getBrokerName(), messageQueue.getQueueId()); opQueueMap.put(messageQueue, opQueue); } return opQueue; }获取准备队列以及操作队列的提交偏移量或者队列的最小偏移量
public long fetchConsumeOffset(MessageQueue mq) { long offset = brokerController.getConsumerOffsetManager().queryOffset(TransactionalMessageUtil.buildConsumerGroup(), mq.getTopic(), mq.getQueueId()); if (offset == -1) { offset = store.getMinOffsetInQueue(mq.getTopic(), mq.getQueueId()); } return offset; } public static String buildConsumerGroup() { return MixAll.CID_SYS_RMQ_TRANS; }判断两个偏移量是否合法
long halfOffset = transactionalMessageBridge.fetchConsumeOffset(messageQueue); long opOffset = transactionalMessageBridge.fetchConsumeOffset(opQueue); log.info("Before check, the queue={} msgOffset={} opOffset={}", messageQueue, halfOffset, opOffset); if (halfOffset < 0 || opOffset < 0) { log.error("MessageQueue: {} illegal offset read: {}, op offset: {},skip this queue", messageQueue, halfOffset, opOffset); continue; }根据队列偏移量获取操作消息
private PullResult pullOpMsg(MessageQueue mq, long offset, int nums) { return transactionalMessageBridge.getOpMessage(mq.getQueueId(), offset, nums); }每次最多获取32个消息
public PullResult getOpMessage(int queueId, long offset, int nums) { String group = TransactionalMessageUtil.buildConsumerGroup(); String topic = TransactionalMessageUtil.buildOpTopic(); SubscriptionData sub = new SubscriptionData(topic, "*"); return getMessage(group, topic, queueId, offset, nums, sub); } private PullResult getMessage(String group, String topic, int queueId, long offset, int nums, SubscriptionData sub) { GetMessageResult getMessageResult = store.getMessage(group, topic, queueId, offset, nums, null); if (getMessageResult != null) { PullStatus pullStatus = PullStatus.NO_NEW_MSG; List<MessageExt> foundList = null; switch (getMessageResult.getStatus()) { case FOUND: pullStatus = PullStatus.FOUND; foundList = decodeMsgList(getMessageResult); this.brokerController.getBrokerStatsManager().incGroupGetNums(group, topic, getMessageResult.getMessageCount()); this.brokerController.getBrokerStatsManager().incGroupGetSize(group, topic, getMessageResult.getBufferTotalSize()); this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount()); this.brokerController.getBrokerStatsManager().recordDiskFallBehindTime(group, topic, queueId, this.brokerController.getMessageStore().now() - foundList.get(foundList.size() - 1) .getStoreTimestamp()); break; case NO_MATCHED_MESSAGE: pullStatus = PullStatus.NO_MATCHED_MSG; LOGGER.warn("No matched message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", getMessageResult.getStatus(), topic, group, offset); break; case NO_MESSAGE_IN_QUEUE: pullStatus = PullStatus.NO_NEW_MSG; LOGGER.warn("No new message. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", getMessageResult.getStatus(), topic, group, offset); break; case MESSAGE_WAS_REMOVING: case NO_MATCHED_LOGIC_QUEUE: case OFFSET_FOUND_NULL: case OFFSET_OVERFLOW_BADLY: case OFFSET_OVERFLOW_ONE: case OFFSET_TOO_SMALL: pullStatus = PullStatus.OFFSET_ILLEGAL; LOGGER.warn("Offset illegal. GetMessageStatus={}, topic={}, groupId={}, requestOffset={}", getMessageResult.getStatus(), topic, group, offset); break; default: assert false; break; } return new PullResult(pullStatus, getMessageResult.getNextBeginOffset(), getMessageResult.getMinOffset(), getMessageResult.getMaxOffset(), foundList); } else { LOGGER.error("Get message from store return null. topic={}, groupId={}, requestOffset={}", topic, group, offset); return null; } }如果有消息的话就解码具体消息,没消息的话返回无新消息
private List<MessageExt> decodeMsgList(GetMessageResult getMessageResult) { List<MessageExt> foundList = new ArrayList<>(); try { List<ByteBuffer> messageBufferList = getMessageResult.getMessageBufferList(); for (ByteBuffer bb : messageBufferList) { MessageExt msgExt = MessageDecoder.decode(bb); foundList.add(msgExt); } } finally { getMessageResult.release(); } return foundList; }当有消息时获取消息内容解析出具体准备消息的队列偏移量,与准备队列的偏移量进行比较,小于的话说明该准备消息已经操作完成,放进操作下标完成集合中,否则的话就放进准备队列下标与操作队列下标的map集合中。
private PullResult fillOpRemoveMap(HashMap<Long, Long> removeMap, MessageQueue opQueue, long pullOffsetOfOp, long miniOffset, List<Long> doneOpOffset) { PullResult pullResult = pullOpMsg(opQueue, pullOffsetOfOp, 32); if (null == pullResult) { return null; } if (pullResult.getPullStatus() == PullStatus.OFFSET_ILLEGAL || pullResult.getPullStatus() == PullStatus.NO_MATCHED_MSG) { log.warn("The miss op offset={} in queue={} is illegal, pullResult={}", pullOffsetOfOp, opQueue, pullResult); transactionalMessageBridge.updateConsumeOffset(opQueue, pullResult.getNextBeginOffset()); return pullResult; } else if (pullResult.getPullStatus() == PullStatus.NO_NEW_MSG) { log.warn("The miss op offset={} in queue={} is NO_NEW_MSG, pullResult={}", pullOffsetOfOp, opQueue, pullResult); return pullResult; } List<MessageExt> opMsg = pullResult.getMsgFoundList(); if (opMsg == null) { log.warn("The miss op offset={} in queue={} is empty, pullResult={}", pullOffsetOfOp, opQueue, pullResult); return pullResult; } for (MessageExt opMessageExt : opMsg) { Long queueOffset = getLong(new String(opMessageExt.getBody(), TransactionalMessageUtil.charset)); log.info("Topic: {} tags: {}, OpOffset: {}, HalfOffset: {}", opMessageExt.getTopic(), opMessageExt.getTags(), opMessageExt.getQueueOffset(), queueOffset); if (TransactionalMessageUtil.REMOVETAG.equals(opMessageExt.getTags())) { if (queueOffset < miniOffset) { doneOpOffset.add(opMessageExt.getQueueOffset()); } else { removeMap.put(queueOffset, opMessageExt.getQueueOffset()); } } else { log.error("Found a illegal tag in opMessageExt= {} ", opMessageExt); } } log.debug("Remove map: {}", removeMap); log.debug("Done op list: {}", doneOpOffset); return pullResult; }判断单个准备消息队列是否已经执行了60s,超过时间则执行下一个,判断该准备队列下标是否已经有对应的操作消息队列下标,有的话说明该准备消息已经得到确认
// single thread int getMessageNullCount = 1; long newOffset = halfOffset; long i = halfOffset; while (true) { if (System.currentTimeMillis() - startTime > MAX_PROCESS_TIME_LIMIT) { log.info("Queue={} process time reach max={}", messageQueue, MAX_PROCESS_TIME_LIMIT); break; } if (removeMap.containsKey(i)) { log.info("Half offset {} has been committed/rolled back", i); removeMap.remove(i); } else { GetResult getResult = getHalfMsg(messageQueue, i); MessageExt msgExt = getResult.getMsg(); if (msgExt == null) { if (getMessageNullCount++ > MAX_RETRY_COUNT_WHEN_HALF_NULL) { break; } if (getResult.getPullResult().getPullStatus() == PullStatus.NO_NEW_MSG) { log.debug("No new msg, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); break; } else { log.info("Illegal offset, the miss offset={} in={}, continue check={}, pull result={}", i, messageQueue, getMessageNullCount, getResult.getPullResult()); i = getResult.getPullResult().getNextBeginOffset(); newOffset = i; continue; } } if (needDiscard(msgExt, transactionCheckMax) || needSkip(msgExt)) { listener.resolveDiscardMsg(msgExt); newOffset = i + 1; i++; continue; } if (msgExt.getStoreTimestamp() >= startTime) { log.debug("Fresh stored. the miss offset={}, check it later, store={}", i, new Date(msgExt.getStoreTimestamp())); break; } long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); long checkImmunityTime = transactionTimeout; String checkImmunityTimeStr = msgExt.getUserProperty(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS); if (null != checkImmunityTimeStr) { checkImmunityTime = getImmunityTime(checkImmunityTimeStr, transactionTimeout); if (valueOfCurrentMinusBorn < checkImmunityTime) { if (checkPrepareQueueOffset(removeMap, doneOpOffset, msgExt)) { newOffset = i + 1; i++; continue; } } } else { if ((0 <= valueOfCurrentMinusBorn) && (valueOfCurrentMinusBorn < checkImmunityTime)) { log.debug("New arrived, the miss offset={}, check it later checkImmunity={}, born={}", i, checkImmunityTime, new Date(msgExt.getBornTimestamp())); break; } } List<MessageExt> opMsg = pullResult.getMsgFoundList(); boolean isNeedCheck = (opMsg == null && valueOfCurrentMinusBorn > checkImmunityTime) || (opMsg != null && (opMsg.get(opMsg.size() - 1).getBornTimestamp() - startTime > transactionTimeout)) || (valueOfCurrentMinusBorn <= -1); if (isNeedCheck) { if (!putBackHalfMsgQueue(msgExt, i)) { continue; } listener.resolveHalfMsg(msgExt); } else { pullResult = fillOpRemoveMap(removeMap, opQueue, pullResult.getNextBeginOffset(), halfOffset, doneOpOffset); log.info("The miss offset:{} in messageQueue:{} need to get more opMsg, result is:{}", i, messageQueue, pullResult); continue; } } newOffset = i + 1; i++; }