首页>>后端>>java->RocketMQ特性——Broker是如何存储事务消息的?

RocketMQ特性——Broker是如何存储事务消息的?

时间:2023-12-07 本站 点击:0

在Broker中,事务消息的初始化是通过BrokerController.initialTransaction()方法执行的。

privatevoidinitialTransaction(){this.transactionalMessageService=ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID,TransactionalMessageService.class);if(null==this.transactionalMessageService){this.transactionalMessageService=newTransactionalMessageServiceImpl(newTransactionalMessageBridge(this,this.getMessageStore()));LOG.warn("Loaddefaulttransactionmessagehookservice:{}",TransactionalMessageServiceImpl.class.getSimpleName());}this.transactionalMessageCheckListener=ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID,AbstractTransactionalMessageCheckListener.class);if(null==this.transactionalMessageCheckListener){this.transactionalMessageCheckListener=newDefaultTransactionalMessageCheckListener();LOG.warn("Loaddefaultdiscardmessagehookservice:{}",DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService=newTransactionalMessageCheckService(this);}

这里有三个核心的初始化变量

TransactionalMessageService

事务消息主要处理服务。默认实现类是TransactionalMessageServiceImpl也可以自己定义事务消息处理实现类,通过ServiceProvider.loadClass()方法进行加载。TransactionalMessageService类定义如下。内部属性已加注释标明。

publicinterfaceTransactionalMessageService{//用于保存Half事务消息PutMessageResultprepareMessage(MessageExtBrokerInnermessageInner);CompletableFuture<PutMessageResult>asyncPrepareMessage(MessageExtBrokerInnermessageInner);//删除事务消息booleandeletePrepareMessage(MessageExtmessageExt);//提交事务消息OperationResultcommitMessage(EndTransactionRequestHeaderrequestHeader);//回滚事务消息OperationResultrollbackMessage(EndTransactionRequestHeaderrequestHeader);voidcheck(longtransactionTimeout,inttransactionCheckMax,AbstractTransactionalMessageCheckListenerlistener);//打开事务消息booleanopen();//关闭事务消息voidclose();}

transactionalMessageCheckListener

事务消息回查监听器

transactionalMessageCheckService

事务消息回查服务,启动一个线程定时检查超时的Half消息是否需要回查。

处理事务消息

当初始化完成之后,Broker就可以处理事务消息了。

Broker存储事务消息的是org.apache.rocketmq.broker.processor.SendMessageProcessor,这和普通消息其实是一样的。 但是有两点针对事务消息的特殊处理:

第一处:

org.apache.rocketmq.broker.processor.SendMessageProcessor#sendMessage中:

//获取扩展字段的值,若是该值为true则为事务消息StringtraFlag=oriProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);booleansendTransactionPrepareMessage=false;if(Boolean.parseBoolean(traFlag)&&!(msgInner.getReconsumeTimes()>0&&msgInner.getDelayTimeLevel()>0)){//判断当前Broker配置是否支持事务消息if(this.brokerController.getBrokerConfig().isRejectTransactionMessage()){response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("thebroker["+this.brokerController.getBrokerConfig().getBrokerIP1()+"]sendingtransactionmessageisforbidden");returnresponse;}sendTransactionPrepareMessage=true;}

if(sendTransactionPrepareMessage){//保存Half信息putMessageResult=this.brokerController.getTransactionalMessageService().prepareMessage(msgInner);}else{putMessageResult=this.brokerController.getMessageStore().putMessage(msgInner);}

第二处:

存储事务消息前的预处理,对应方法是org.apache.rocketmq.broker.transaction.queue.TransactionalMessageBridge#parseHalfMessageInner

privateMessageExtBrokerInnerparseHalfMessageInner(MessageExtBrokerInnermsgInner){//将原消息的topic保存在扩展字段中MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_REAL_TOPIC,msgInner.getTopic());//将原消息的QueueId保存在扩展字段中MessageAccessor.putProperty(msgInner,MessageConst.PROPERTY_REAL_QUEUE_ID,String.valueOf(msgInner.getQueueId()));//将原消息的SysFlag保存在扩展字段中msgInner.setSysFlag(MessageSysFlag.resetTransactionValue(msgInner.getSysFlag(),MessageSysFlag.TRANSACTION_NOT_TYPE));//修改topic的值为RMQ_SYS_TRANS_HALF_TOPICmsgInner.setTopic(TransactionalMessageUtil.buildHalfTopic());//修改Queueid为0msgInner.setQueueId(0);msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));returnmsgInner;}

完成上述步骤之后,调用DefaultMessageStole.putMessage()方法将其保存到CommitLog中。

CommitLog存储成功之后,通过org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend()方法对其进行处理。

finalinttranType=MessageSysFlag.getTransactionValue(msgInner.getSysFlag());switch(tranType){//PreparedandRollbackmessageisnotconsumed,willnotentertheconsumequeuecaseMessageSysFlag.TRANSACTION_PREPARED_TYPE:caseMessageSysFlag.TRANSACTION_ROLLBACK_TYPE:queueOffset=0L;break;caseMessageSysFlag.TRANSACTION_NOT_TYPE:caseMessageSysFlag.TRANSACTION_COMMIT_TYPE:default:break;}

这里的逻辑是这样的,当读到的消息类型为事务消息时,设置当前消息的位点值为0,而不是设置真实的位点。这样该位点就不会建立ConsumeQueue索引,也不会被消费。


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:/java/18711.html