否则的话获取准备消息
private GetResult getHalfMsg(MessageQueue messageQueue, long offset) { GetResult getResult = new GetResult(); PullResult result = pullHalfMsg(messageQueue, offset, PULL_MSG_RETRY_NUMBER); getResult.setPullResult(result); List<MessageExt> messageExts = result.getMsgFoundList(); if (messageExts == null) { return getResult; } getResult.setMsg(messageExts.get(0)); return getResult; }每次取一个消息
private PullResult pullHalfMsg(MessageQueue mq, long offset, int nums) { return transactionalMessageBridge.getHalfMessage(mq.getQueueId(), offset, nums); } public PullResult getHalfMessage(int queueId, long offset, int nums) { String group = TransactionalMessageUtil.buildConsumerGroup(); String topic = TransactionalMessageUtil.buildHalfTopic(); SubscriptionData sub = new SubscriptionData(topic, "*"); return getMessage(group, topic, queueId, offset, nums, sub); }判断消息是否为空,为空的话需要判断一下是否还要继续查找,默认不需要继续查找,只判断一次。当返回状态为没有新消息时也是忽略该消息队列,查询下一个,否则就是非法下标,进行下一次查询。获取到消息后,判断检查次数是否达到上限,默认15次。
private boolean needDiscard(MessageExt msgExt, int transactionCheckMax) { String checkTimes = msgExt.getProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES); int checkTime = 1; if (null != checkTimes) { checkTime = getInt(checkTimes); if (checkTime >= transactionCheckMax) { return true; } else { checkTime++; } } msgExt.putUserProperty(MessageConst.PROPERTY_TRANSACTION_CHECK_TIMES, String.valueOf(checkTime)); return false; }检查消息是否已经过期,默认是72小时。
private boolean needSkip(MessageExt msgExt) { long valueOfCurrentMinusBorn = System.currentTimeMillis() - msgExt.getBornTimestamp(); if (valueOfCurrentMinusBorn > transactionalMessageBridge.getBrokerController().getMessageStoreConfig().getFileReservedTime() * 3600L * 1000) { log.info("Half message exceed file reserved time ,so skip it.messageId {},bornTime {}", msgExt.getMsgId(), msgExt.getBornTimestamp()); return true; } return false; }满足上面两个条件的话就移除该消息,给新的准备消息队列下标加一。
public void resolveDiscardMsg(MessageExt msgExt) { log.error("MsgExt:{} has been checked too many times, so discard it", msgExt); }判断消息的存储时间是否小于检查开始时间,大于的话代表是刚产生的消息,有可能本地事务还没有执行完,所以进行下一个消息队列的检查。检查消息是否设置了免疫时间,也就是说在这个区间内可以不用检查消息是否被确认
private long getImmunityTime(String checkImmunityTimeStr, long transactionTimeout) { long checkImmunityTime; checkImmunityTime = getLong(checkImmunityTimeStr); if (-1 == checkImmunityTime) { checkImmunityTime = transactionTimeout; } else { checkImmunityTime *= 1000; } return checkImmunityTime; }当在免疫期内,检查消息是否设置了对应的准备队列下标,设置的话据判断是否已经有操作消息与之对应
private boolean checkPrepareQueueOffset(HashMap<Long, Long> removeMap, List<Long> doneOpOffset, MessageExt msgExt) { String prepareQueueOffsetStr = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); if (null == prepareQueueOffsetStr) { return putImmunityMsgBackToHalfQueue(msgExt); } else { long prepareQueueOffset = getLong(prepareQueueOffsetStr); if (-1 == prepareQueueOffset) { return false; } else { if (removeMap.containsKey(prepareQueueOffset)) { long tmpOpOffset = removeMap.remove(prepareQueueOffset); doneOpOffset.add(tmpOpOffset); return true; } else { return putImmunityMsgBackToHalfQueue(msgExt); } } } }
都没有的话就再把该消息存储一遍,,设置对应的准备消息队列偏移量的属性,重新存储成功后就对准备消息队列下标偏移量加一进行下一次循环。
private boolean putImmunityMsgBackToHalfQueue(MessageExt messageExt) { MessageExtBrokerInner msgInner = transactionalMessageBridge.renewImmunityHalfMessageInner(messageExt); return transactionalMessageBridge.putMessage(msgInner); } public MessageExtBrokerInner renewImmunityHalfMessageInner(MessageExt msgExt) { MessageExtBrokerInner msgInner = renewHalfMessageInner(msgExt); String queueOffsetFromPrepare = msgExt.getUserProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET); if (null != queueOffsetFromPrepare) { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET, String.valueOf(queueOffsetFromPrepare)); } else { MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_TRANSACTION_PREPARED_QUEUE_OFFSET, String.valueOf(msgExt.getQueueOffset())); } msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties())); return msgInner; } public MessageExtBrokerInner renewHalfMessageInner(MessageExt msgExt) { MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(msgExt.getTopic()); msgInner.setBody(msgExt.getBody()); msgInner.setQueueId(msgExt.getQueueId()); msgInner.setMsgId(msgExt.getMsgId()); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setTags(msgExt.getTags()); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(msgInner.getTags())); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(msgExt.getStoreHost()); msgInner.setWaitStoreMsgOK(false); return msgInner; }没设置免疫期的话就判断当前时间间隔是否超过事务超时时间,没超过的话就检查下一个消息队列的消息数据。判断是否需要检查事务状态,第一没找到操作消息并且不在免疫期内,第二最后一个操作消息的生成时间已经超过事务执行超时时间,第三是生产者和broker时间不一致时会导致broker当前时间小于消息的生产时间。不需要检查的话就继续查询操作消息等信息,进行下一次循环。如果需要检查的话就先把准备消息存储一份,和前面那个免疫期一样,之所以再存储是因为这个消息需要等producer确认,检查不可能都阻塞到这里吧,万一没成功呢,肯定是要异步的,但是要异步的话,那这个准备消息到底算是有没有确认呢?肯定要确认,所以必须再加一个消息来代替原来的消息。
private boolean putBackHalfMsgQueue(MessageExt msgExt, long offset) { PutMessageResult putMessageResult = putBackToHalfQueueReturnResult(msgExt); if (putMessageResult != null && putMessageResult.getPutMessageStatus() == PutMessageStatus.PUT_OK) { msgExt.setQueueOffset( putMessageResult.getAppendMessageResult().getLogicsOffset()); msgExt.setCommitLogOffset( putMessageResult.getAppendMessageResult().getWroteOffset()); msgExt.setMsgId(putMessageResult.getAppendMessageResult().getMsgId()); log.debug( "Send check message, the offset={} restored in queueOffset={} " + "commitLogOffset={} " + "newMsgId={} realMsgId={} topic={}", offset, msgExt.getQueueOffset(), msgExt.getCommitLogOffset(), msgExt.getMsgId(), msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX), msgExt.getTopic()); return true; } else { log.error( "PutBackToHalfQueueReturnResult write failed, topic: {}, queueId: {}, " + "msgId: {}", msgExt.getTopic(), msgExt.getQueueId(), msgExt.getMsgId()); return false; } }重新放入准备消息并等待返回结果,只有当保存成功才可以确认替换成功
private PutMessageResult putBackToHalfQueueReturnResult(MessageExt messageExt) { PutMessageResult putMessageResult = null; try { MessageExtBrokerInner msgInner = transactionalMessageBridge.renewHalfMessageInner(messageExt); putMessageResult = transactionalMessageBridge.putMessageReturnResult(msgInner); } catch (Exception e) { log.warn("PutBackToHalfQueueReturnResult error", e); } return putMessageResult; }添加检查消息状态的任务
public void resolveHalfMsg(final MessageExt msgExt) { executorService.execute(new Runnable() { @Override public void run() { try { sendCheckMessage(msgExt); } catch (Exception e) { LOGGER.error("Send check message error!", e); } } }); }
如果循环过程中没有break或者continue的话就给准备消息的新下标加一,最后一个消息队列执行完毕后判断准备消息队列的新下标是否与原下标相等,不等的话就更新对应的偏移量。
if (newOffset != halfOffset) { transactionalMessageBridge.updateConsumeOffset(messageQueue, newOffset); } public void updateConsumeOffset(MessageQueue mq, long offset) { this.brokerController.getConsumerOffsetManager().commitOffset( RemotingHelper.parseSocketAddressAddr(this.storeHost), TransactionalMessageUtil.buildConsumerGroup(), mq.getTopic(), mq.getQueueId(), offset); }计算已经确认的操作消息队列的下标已经递增执行到了哪一个下标,最后判断新旧下标是否相等,不等则更新偏移量。操作消息需要一个一个来确认,就算后面的下标已经确认了,但是前面的没确认,偏移量依然是前面的下标,因为要根据操作消息队列的偏移量来查询操作消息,根据这个消息来判断准备消息有没有被确认。偏移量不依次确认的话就会导致前面的偏移量还没确认,后面的就给提交偏移量了。举个例子,发了33个消息偏移量为(0,。。。,32),准备消息队列提交偏移量为1和操作消息队列提交偏移量为0,操作消息队里已经有33个确认消息(32,0,1,。。。,31),因为每次只会查32个操作消息,所以只会获取到(32,0,1,。。。,30),得到的已确认偏移量为【1,31】,但是初始是0,这个时候不判断递增的话,就会直接提交为31,导致认为这个32未确认,因为下一次获取操作消息就会取偏移量31之后的,所以就需要生产者在来确认一次,所以要选择递增来保证正确性。
private long calculateOpOffset(List<Long> doneOffset, long oldOffset) { Collections.sort(doneOffset); long newOffset = oldOffset; for (int i = 0; i < doneOffset.size(); i++) { if (doneOffset.get(i) == newOffset) { newOffset++; } else { break; } } return newOffset; } long newOpOffset = calculateOpOffset(doneOpOffset, opOffset); if (newOpOffset != opOffset) { transactionalMessageBridge.updateConsumeOffset(opQueue, newOpOffset); }broker组装回查请求头,还原以前的topic和queueId,把唯一key设置为消息id和事务id。
public void sendCheckMessage(MessageExt msgExt) throws Exception { CheckTransactionStateRequestHeader checkTransactionStateRequestHeader = new CheckTransactionStateRequestHeader(); checkTransactionStateRequestHeader.setCommitLogOffset(msgExt.getCommitLogOffset()); checkTransactionStateRequestHeader.setOffsetMsgId(msgExt.getMsgId()); checkTransactionStateRequestHeader.setMsgId(msgExt.getUserProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX)); checkTransactionStateRequestHeader.setTransactionId(checkTransactionStateRequestHeader.getMsgId()); checkTransactionStateRequestHeader.setTranStateTableOffset(msgExt.getQueueOffset()); msgExt.setTopic(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_TOPIC)); msgExt.setQueueId(Integer.parseInt(msgExt.getUserProperty(MessageConst.PROPERTY_REAL_QUEUE_ID))); msgExt.setStoreSize(0); String groupId = msgExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); Channel channel = brokerController.getProducerManager().getAvaliableChannel(groupId); if (channel != null) { brokerController.getBroker2Client().checkProducerTransactionState(groupId, channel, checkTransactionStateRequestHeader, msgExt); } else { LOGGER.warn("Check transaction failed, channel is null. groupId={}", groupId); } }根据组名获取存活的生产者通讯通道
public Channel getAvaliableChannel(String groupId) { HashMap<Channel, ClientChannelInfo> channelClientChannelInfoHashMap = groupChannelTable.get(groupId); List<Channel> channelList = new ArrayList<Channel>(); if (channelClientChannelInfoHashMap != null) { for (Channel channel : channelClientChannelInfoHashMap.keySet()) { channelList.add(channel); } int size = channelList.size(); if (0 == size) { log.warn("Channel list is empty. groupId={}", groupId); return null; } int index = positiveAtomicCounter.incrementAndGet() % size; Channel channel = channelList.get(index); int count = 0; boolean isOk = channel.isActive() && channel.isWritable(); while (count++ < GET_AVALIABLE_CHANNEL_RETRY_COUNT) { if (isOk) { return channel; } index = (++index) % size; channel = channelList.get(index); isOk = channel.isActive() && channel.isWritable(); } } else { log.warn("Check transaction failed, channel table is empty. groupId={}", groupId); return null; } return null; }检查事务状态,请求码为CHECK_TRANSACTION_STATE = 39
public void checkProducerTransactionState( final String group, final Channel channel, final CheckTransactionStateRequestHeader requestHeader, final MessageExt messageExt)throws Exception { RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader); request.setBody(MessageDecoder.encode(messageExt, false)); try { this.brokerController.getRemotingServer().invokeOneway(channel, request, 10); } catch (Exception e) { log.error("Check transaction failed because invoke producer exception. group={}, msgId={}", group, messageExt.getMsgId(), e.getMessage()); } }生产者接收消息检查事务状态,把唯一key设置为事务id,查找生产者实例信息
public RemotingCommand checkTransactionState(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final CheckTransactionStateRequestHeader requestHeader = (CheckTransactionStateRequestHeader) request.decodeCommandCustomHeader(CheckTransactionStateRequestHeader.class); final ByteBuffer byteBuffer = ByteBuffer.wrap(request.getBody()); final MessageExt messageExt = MessageDecoder.decode(byteBuffer); if (messageExt != null) { String transactionId = messageExt.getProperty(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (null != transactionId && !"".equals(transactionId)) { messageExt.setTransactionId(transactionId); } final String group = messageExt.getProperty(MessageConst.PROPERTY_PRODUCER_GROUP); if (group != null) { MQProducerInner producer = this.mqClientFactory.selectProducer(group); if (producer != null) { final String addr = RemotingHelper.parseChannelRemoteAddr(ctx.channel()); producer.checkTransactionState(addr, messageExt, requestHeader); } else { log.debug("checkTransactionState, pick producer by group[{}] failed", group); } } else { log.warn("checkTransactionState, pick producer group failed"); } } else { log.warn("checkTransactionState, decode message failed"); } return null; }
组装任务,提交到线程池中执行,执行本地事务,判断返回状态,最后执行结束事务方法,基本上和正常结束流程一致。
public void checkTransactionState(final String addr, final MessageExt msg, final CheckTransactionStateRequestHeader header) { Runnable request = new Runnable() { private final String brokerAddr = addr; private final MessageExt message = msg; private final CheckTransactionStateRequestHeader checkRequestHeader = header; private final String group = DefaultMQProducerImpl.this.defaultMQProducer.getProducerGroup(); @Override public void run() { TransactionCheckListener transactionCheckListener = DefaultMQProducerImpl.this.checkListener(); TransactionListener transactionListener = getCheckListener(); if (transactionCheckListener != null || transactionListener != null) { LocalTransactionState localTransactionState = LocalTransactionState.UNKNOW; Throwable exception = null; try { if (transactionCheckListener != null) { localTransactionState = transactionCheckListener.checkLocalTransactionState(message); } else if (transactionListener != null) { log.debug("Used new check API in transaction message"); localTransactionState = transactionListener.checkLocalTransaction(message); } else { log.warn("CheckTransactionState, pick transactionListener by group[{}] failed", group); } } catch (Throwable e) { log.error("Broker call checkTransactionState, but checkLocalTransactionState exception", e); exception = e; } this.processTransactionState( localTransactionState, group, exception); } else { log.warn("CheckTransactionState, pick transactionCheckListener by group[{}] failed", group); } } private void processTransactionState( final LocalTransactionState localTransactionState, final String producerGroup, final Throwable exception) { final EndTransactionRequestHeader thisHeader = new EndTransactionRequestHeader(); thisHeader.setCommitLogOffset(checkRequestHeader.getCommitLogOffset()); thisHeader.setProducerGroup(producerGroup); thisHeader.setTranStateTableOffset(checkRequestHeader.getTranStateTableOffset()); thisHeader.setFromTransactionCheck(true); String uniqueKey = message.getProperties().get(MessageConst.PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX); if (uniqueKey == null) { uniqueKey = message.getMsgId(); } thisHeader.setMsgId(uniqueKey); thisHeader.setTransactionId(checkRequestHeader.getTransactionId()); switch (localTransactionState) { case COMMIT_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_COMMIT_TYPE); break; case ROLLBACK_MESSAGE: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_ROLLBACK_TYPE); log.warn("when broker check, client rollback this transaction, {}", thisHeader); break; case UNKNOW: thisHeader.setCommitOrRollback(MessageSysFlag.TRANSACTION_NOT_TYPE); log.warn("when broker check, client does not know this transaction state, {}", thisHeader); break; default: break; } String remark = null; if (exception != null) { remark = "checkLocalTransactionState Exception: " + RemotingHelper.exceptionSimpleDesc(exception); } try { DefaultMQProducerImpl.this.mQClientFactory.getMQClientAPIImpl().endTransactionOneway(brokerAddr, thisHeader, remark, 3000); } catch (Exception e) { log.error("endTransactionOneway exception", e); } } }; this.checkExecutor.submit(request); }