分布式事务解决方案中,最大努力通知机制如何实现?

2026-05-23 20:271阅读0评论SEO基础
  • 内容介绍
  • 文章标签
  • 相关推荐

本文共计3528个文字,预计阅读时间需要15分钟。

分布式事务解决方案中,最大努力通知机制如何实现?

分布式事务理论:分布式事务+什么是最大努力通知+最大努力通知也是一种解决分布式事务的方案,下面是一个是值得的例子:交互流程:1、账户系统调用充值系统接口;2、充值系统完成充值操作。

分布式事务理论:分布式事务

什么是最大努力通知

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

交互流程:

  • 1、账户系统调用充值系统接口
  • 2、充值系统完成支付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进行重复通知
  • 3、账户系统接收到充值结果通知修改充值状态。
  • 4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

通过上边的例子我们总结最大努力通知方案的目标:

目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

具体包括:

1、有一定的消息重复通知机制。

因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

2、消息校对机制。

如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同?

1、解决方案思想不同

可靠消息一致性:发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。

最大努力通知:发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

分布式事务解决方案中,最大努力通知机制如何实现?

2、两者的业务应用场景不同

可靠消息一致性:关注的是交易过程的事务一致,以异步的方式完成交易。

最大努力通知:关注的是交易后的通知事务,即将交易结果可靠的通知出去。

3、技术解决方向不同

可靠消息一致性:要解决消息从发出到接收的一致性,即消息发出并且被接收到。

最大努力通知:无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

解决方案

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。

方案1:

本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:

  • 1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。 注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果(后边会讲)。

  • 2、接收通知方监听 MQ。

  • 3、接收通知方接收消息,业务处理完成回应ack。

  • 4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。

  • 5、接收通知方可通过消息校对接口来校对消息的一致性。

方案2:

本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:

交互流程如下:

  • 1、发起通知方将通知发给MQ。 使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。

  • 2、通知程序监听 MQ,接收MQ的消息。 方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。 通知程序若没有回应ack则MQ会重复通知。

  • 3、通知程序通过互联网接口协议(如rocketmq.apache.org/dowloading/releases/

  • 2)、解压并启动 启动nameserver:

Windows系统: set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqnamesrv.cmd Centos系统: 进入rocketMQ解压目录下的bin文件夹 nohup sh bin/mqnamesrv & 日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/namesrv.log

启动broker:

Windows系统: set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true Centos系统: 进入rocketMQ解压目录下的bin文件夹 nohup sh bin/mqbroker & 日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/broker.log

创建

rocket-notifymsg-demo-bank1:银行1,操作张三账户, 连接数据库bank1 rocket-notifymsg-demo-pay:银行2,操作充值记录,连接数据库bank1_pay

引入maven依赖:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.10.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <exclusions> <exclusion> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.18</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.74</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR8</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.3.6</version> <configuration> <configurationFile> ${basedir}/src/main/resources/generator/generatorConfig.xml </configurationFile> <overwrite>true</overwrite> <verbose>true</verbose> </configuration> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>4.1.5</version> </dependency> </dependencies> </plugin> </plugins> </build>

rocket-notifymsg-demo-pay

rocket-notifymsg-demo-pay实现如下功能:

  • 1、充值接口
  • 2、充值完成要通知
  • 3、充值结果查询接口

application.properties

spring.application.name=notify-msg-pay server.port=8094 spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1_pay?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC spring.datasource.username = root spring.datasource.password = mysql rocketmq.producer.group = producer_notifymsg_pay rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.cn.itcast.wanxintx.effortdemo = debug

Controller

@RestController @RequestMapping("/account") public class AccountPayController { @Autowired private AccountPayService accountPayService; //充值 @GetMapping(value = "/paydo") public AccountPay pay(AccountPay accountPay){ //生成事务编号 String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo); return accountPayService.insertAccountPay(accountPay); } //查询充值结果 @GetMapping(value = "/payResult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo){ return accountPayService.getAccountPay(txNo); } }

Service

@Service @Slf4j public class AccountPayService { @Autowired private AccountPayMapper accountPayMapper; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 插入充值记录 * @param accountPay * @return */ public AccountPay insertAccountPay(AccountPay accountPay) { int success = accountPayMapper.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success"); if(success>0){ //发送通知,使用普通消息发送通知 accountPay.setResult("success"); rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay); return accountPay; } return null; } /** * 查询充值记录,接收通知方调用此方法来查询充值结果 * @param txNo * @return */ public AccountPay getAccountPay(String txNo) { AccountPay accountPay = accountPayMapper.findByIdTxNo(txNo); return accountPay; } }

Mapper

public interface AccountPayMapper extends Mapper<AccountPay> { int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Long pay_amount, @Param("result") String result); AccountPay findByIdTxNo(@Param("txNo") String txNo); } <mapper namespace="com.yibo.notifypay.mapper.AccountPayMapper"> <resultMap id="BaseResultMap" type="com.yibo.notifypay.domain.entity.AccountPay"> <!-- WARNING - @mbg.generated --> <id column="id" jdbcType="VARCHAR" property="id" /> <result column="account_no" jdbcType="VARCHAR" property="accountNo" /> <result column="pay_amount" jdbcType="BIGINT" property="payAmount" /> <result column="result" jdbcType="VARCHAR" property="result" /> </resultMap> <insert id="insertAccountPay"> insert into account_pay(id,account_no,pay_amount,result) values(#{id},#{accountNo},#{payAmount},#{result}) </insert> <select id="findByIdTxNo" resultType="BaseResultMap"> select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo} </select> </mapper>

rocket-notifymsg-demo-bank1

rocket-notifymsg-demo-bank1实现如下功能:

  • 1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。
  • 2、主动查询充值系统,根据充值结果完成账户金额修改。

application.properties

spring.application.name=notify-msg-bank1 server.port=8096 spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC spring.datasource.username = root spring.datasource.password = mysql rocketmq.producer.group = producer_notifymsg_pay rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.cn.itcast.wanxintx.effortdemo = debug

Controller

@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; //主动查询充值结果 @GetMapping(value = "/payresult/{txNo}") public AccountPay result(@PathVariable("txNo") String txNo){ AccountPay accountPay = accountInfoService.queryPayResult(txNo); return accountPay; } }

Service

@Service @Slf4j public class AccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; @Autowired private PayClient payClient; /** * 更新账户金额 * @param accountChange */ @Transactional public void updateAccountBalance(AccountChangeEvent accountChange) { //幂等校验 if(accountInfoMapper.isExistTx(accountChange.getTxNo())>0){ return ; } int i = accountInfoMapper.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount()); //插入事务记录,用于幂等控制 accountInfoMapper.addTx(accountChange.getTxNo()); } /** * 远程调用查询充值结果 * @param tx_no * @return */ public AccountPay queryPayResult(String tx_no) { //远程调用 AccountPay payresult = payClient.payresult(tx_no); if("success".equals(payresult.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号 accountChangeEvent.setAmount(payresult.getPayAmount());//金额 accountChangeEvent.setTxNo(payresult.getId());//充值事务号 updateAccountBalance(accountChangeEvent); } return payresult; } }

Consumer

@Component @Slf4j @RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1") public class NotifyMsgConsumer implements RocketMQListener<AccountPay> { @Autowired private AccountInfoService accountInfoService; @Override public void onMessage(AccountPay accountPay) { log.info("接收到消息:{}", JSON.toJSONString(accountPay)); if("success".equals(accountPay.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); accountInfoService.updateAccountBalance(accountChangeEvent); } log.info("处理消息完成:{}", JSON.toJSONString(accountPay)); } }

Feign

@FeignClient(value = "notify-msg-pay") public interface PayClient { //远程调用充值系统的接口查询充值结果 @GetMapping(value = "account/payResult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo); }

Mapper

public interface AccountInfoMapper extends Mapper<AccountInfo> { //修改账户金额 int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Long amount); //查询幂等记录,用于幂等控制 int isExistTx(String txNo); //添加事务记录,用于幂等控制 int addTx(String txNo); } <mapper namespace="com.yibo.notify.mapper.AccountInfoMapper"> <resultMap id="BaseResultMap" type="com.yibo.notify.domain.entity.AccountInfo"> <!-- WARNING - @mbg.generated --> <id column="id" jdbcType="BIGINT" property="id" /> <result column="account_name" jdbcType="VARCHAR" property="accountName" /> <result column="account_no" jdbcType="VARCHAR" property="accountNo" /> <result column="account_password" jdbcType="VARCHAR" property="accountPassword" /> <result column="account_balance" jdbcType="BIGINT" property="accountBalance" /> </resultMap> <!-- 修改账户金额 --> <update id="updateAccountBalance"> update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo} </update> <!-- 查询幂等记录,用于幂等控制 --> <select id="isExistTx"> select count(1) from de_duplication where tx_no = #{txNo} </select> <!-- 添加事务记录,用于幂等控制 --> <insert id="addTx"> insert into de_duplication values(#{txNo},now()) </insert> </mapper>

总结

最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务。

最大努力通知方案需要实现如下功能:

  • 1、消息重复通知机制。
  • 2、消息校对机制。

github源码地址:github.com/jjhyb/distributed-transaction

参考: www.cnblogs.com/zeussbook/p/11799017.html

本文共计3528个文字,预计阅读时间需要15分钟。

分布式事务解决方案中,最大努力通知机制如何实现?

分布式事务理论:分布式事务+什么是最大努力通知+最大努力通知也是一种解决分布式事务的方案,下面是一个是值得的例子:交互流程:1、账户系统调用充值系统接口;2、充值系统完成充值操作。

分布式事务理论:分布式事务

什么是最大努力通知

最大努力通知也是一种解决分布式事务的方案,下边是一个是充值的例子:

交互流程:

  • 1、账户系统调用充值系统接口
  • 2、充值系统完成支付处理向账户系统发起充值结果通知,若通知失败,则充值系统按策略进行重复通知
  • 3、账户系统接收到充值结果通知修改充值状态。
  • 4、账户系统未接收到通知会主动调用充值系统的接口查询充值结果。

通过上边的例子我们总结最大努力通知方案的目标:

目标:发起通知方通过一定的机制最大努力将业务处理结果通知到接收方。

具体包括:

1、有一定的消息重复通知机制。

因为接收通知方可能没有接收到通知,此时要有一定的机制对消息重复通知。

2、消息校对机制。

如果尽最大努力也没有通知到接收方,或者接收方消费消息后要再次消费,此时可由接收方主动向通知方查询消息信息来满足需求。

最大努力通知与可靠消息一致性有什么不同?

1、解决方案思想不同

可靠消息一致性:发起通知方需要保证将消息发出去,并且将消息发到接收通知方,消息的可靠性关键由发起通知方来保证。

最大努力通知:发起通知方尽最大的努力将业务处理结果通知为接收通知方,但是可能消息接收不到,此时需要接收通知方主动调用发起通知方的接口查询业务处理结果,通知的可靠性关键在接收通知方。

分布式事务解决方案中,最大努力通知机制如何实现?

2、两者的业务应用场景不同

可靠消息一致性:关注的是交易过程的事务一致,以异步的方式完成交易。

最大努力通知:关注的是交易后的通知事务,即将交易结果可靠的通知出去。

3、技术解决方向不同

可靠消息一致性:要解决消息从发出到接收的一致性,即消息发出并且被接收到。

最大努力通知:无法保证消息从发出到接收的一致性,只提供消息接收的可靠性机制。可靠机制是,最大努力的将消息通知给接收方,当消息无法被接收方接收时,由接收方主动查询消息(业务处理结果)。

解决方案

通过对最大努力通知的理解,采用MQ的ack机制就可以实现最大努力通知。

方案1:

本方案是利用MQ的ack机制由MQ向接收通知方发送通知,流程如下:

  • 1、发起通知方将通知发给MQ。 使用普通消息机制将通知发给MQ。 注意:如果消息没有发出去可由接收通知方主动请求发起通知方查询业务执行结果(后边会讲)。

  • 2、接收通知方监听 MQ。

  • 3、接收通知方接收消息,业务处理完成回应ack。

  • 4、接收通知方若没有回应ack则MQ会重复通知。 MQ会按照间隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知间隔 (如果MQ采用rocketMq,在broker中可进行配置),直到达到通知要求的时间窗口上限。

  • 5、接收通知方可通过消息校对接口来校对消息的一致性。

方案2:

本方案也是利用MQ的ack机制,与方案1不同的是应用程序向接收通知方发送通知,如下图:

交互流程如下:

  • 1、发起通知方将通知发给MQ。 使用可靠消息一致方案中的事务消息保证本地事务与消息的原子性,最终将通知先发给MQ。

  • 2、通知程序监听 MQ,接收MQ的消息。 方案1中接收通知方直接监听MQ,方案2中由通知程序监听MQ。 通知程序若没有回应ack则MQ会重复通知。

  • 3、通知程序通过互联网接口协议(如rocketmq.apache.org/dowloading/releases/

  • 2)、解压并启动 启动nameserver:

Windows系统: set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqnamesrv.cmd Centos系统: 进入rocketMQ解压目录下的bin文件夹 nohup sh bin/mqnamesrv & 日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/namesrv.log

启动broker:

Windows系统: set ROCKETMQ_HOME=[rocketmq服务端解压路径] start [rocketmq服务端解压路径]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true Centos系统: 进入rocketMQ解压目录下的bin文件夹 nohup sh bin/mqbroker & 日志目录:{rocketMQ解压目录}/logs/rocketmqlogs/broker.log

创建

rocket-notifymsg-demo-bank1:银行1,操作张三账户, 连接数据库bank1 rocket-notifymsg-demo-pay:银行2,操作充值记录,连接数据库bank1_pay

引入maven依赖:

<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.10.RELEASE</version> <relativePath/> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-jdbc</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-openfeign</artifactId> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId> <exclusions> <exclusion> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.alibaba.nacos</groupId> <artifactId>nacos-client</artifactId> <version>1.3.1</version> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.1.1</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper-spring-boot-starter</artifactId> <version>2.1.5</version> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.18</version> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.18.12</version> <scope>provided</scope> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.74</version> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Hoxton.SR8</version> <type>pom</type> <scope>import</scope> </dependency> <dependency> <groupId>com.alibaba.cloud</groupId> <artifactId>spring-cloud-alibaba-dependencies</artifactId> <version>2.2.1.RELEASE</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> <plugin> <groupId>org.mybatis.generator</groupId> <artifactId>mybatis-generator-maven-plugin</artifactId> <version>1.3.6</version> <configuration> <configurationFile> ${basedir}/src/main/resources/generator/generatorConfig.xml </configurationFile> <overwrite>true</overwrite> <verbose>true</verbose> </configuration> <dependencies> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.41</version> </dependency> <dependency> <groupId>tk.mybatis</groupId> <artifactId>mapper</artifactId> <version>4.1.5</version> </dependency> </dependencies> </plugin> </plugins> </build>

rocket-notifymsg-demo-pay

rocket-notifymsg-demo-pay实现如下功能:

  • 1、充值接口
  • 2、充值完成要通知
  • 3、充值结果查询接口

application.properties

spring.application.name=notify-msg-pay server.port=8094 spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1_pay?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC spring.datasource.username = root spring.datasource.password = mysql rocketmq.producer.group = producer_notifymsg_pay rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.cn.itcast.wanxintx.effortdemo = debug

Controller

@RestController @RequestMapping("/account") public class AccountPayController { @Autowired private AccountPayService accountPayService; //充值 @GetMapping(value = "/paydo") public AccountPay pay(AccountPay accountPay){ //生成事务编号 String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo); return accountPayService.insertAccountPay(accountPay); } //查询充值结果 @GetMapping(value = "/payResult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo){ return accountPayService.getAccountPay(txNo); } }

Service

@Service @Slf4j public class AccountPayService { @Autowired private AccountPayMapper accountPayMapper; @Autowired private RocketMQTemplate rocketMQTemplate; /** * 插入充值记录 * @param accountPay * @return */ public AccountPay insertAccountPay(AccountPay accountPay) { int success = accountPayMapper.insertAccountPay(accountPay.getId(), accountPay.getAccountNo(), accountPay.getPayAmount(), "success"); if(success>0){ //发送通知,使用普通消息发送通知 accountPay.setResult("success"); rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay); return accountPay; } return null; } /** * 查询充值记录,接收通知方调用此方法来查询充值结果 * @param txNo * @return */ public AccountPay getAccountPay(String txNo) { AccountPay accountPay = accountPayMapper.findByIdTxNo(txNo); return accountPay; } }

Mapper

public interface AccountPayMapper extends Mapper<AccountPay> { int insertAccountPay(@Param("id") String id, @Param("accountNo") String accountNo, @Param("payAmount") Long pay_amount, @Param("result") String result); AccountPay findByIdTxNo(@Param("txNo") String txNo); } <mapper namespace="com.yibo.notifypay.mapper.AccountPayMapper"> <resultMap id="BaseResultMap" type="com.yibo.notifypay.domain.entity.AccountPay"> <!-- WARNING - @mbg.generated --> <id column="id" jdbcType="VARCHAR" property="id" /> <result column="account_no" jdbcType="VARCHAR" property="accountNo" /> <result column="pay_amount" jdbcType="BIGINT" property="payAmount" /> <result column="result" jdbcType="VARCHAR" property="result" /> </resultMap> <insert id="insertAccountPay"> insert into account_pay(id,account_no,pay_amount,result) values(#{id},#{accountNo},#{payAmount},#{result}) </insert> <select id="findByIdTxNo" resultType="BaseResultMap"> select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo} </select> </mapper>

rocket-notifymsg-demo-bank1

rocket-notifymsg-demo-bank1实现如下功能:

  • 1、监听MQ,接收充值结果,根据充值结果完成账户金额修改。
  • 2、主动查询充值系统,根据充值结果完成账户金额修改。

application.properties

spring.application.name=notify-msg-bank1 server.port=8096 spring.cloud.nacos.discovery.server-addr=127.0.0.1:8848 spring.datasource.driver-class-name = com.mysql.cj.jdbc.Driver spring.datasource.url = jdbc:mysql://localhost:3306/trade_bank1?useUnicode=true&useAffectedRows=true&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC spring.datasource.username = root spring.datasource.password = mysql rocketmq.producer.group = producer_notifymsg_pay rocketmq.name-server = 127.0.0.1:9876 logging.level.root = info logging.level.org.springframework.web = info logging.level.cn.itcast.wanxintx.effortdemo = debug

Controller

@RestController @Slf4j public class AccountInfoController { @Autowired private AccountInfoService accountInfoService; //主动查询充值结果 @GetMapping(value = "/payresult/{txNo}") public AccountPay result(@PathVariable("txNo") String txNo){ AccountPay accountPay = accountInfoService.queryPayResult(txNo); return accountPay; } }

Service

@Service @Slf4j public class AccountInfoService { @Autowired private AccountInfoMapper accountInfoMapper; @Autowired private PayClient payClient; /** * 更新账户金额 * @param accountChange */ @Transactional public void updateAccountBalance(AccountChangeEvent accountChange) { //幂等校验 if(accountInfoMapper.isExistTx(accountChange.getTxNo())>0){ return ; } int i = accountInfoMapper.updateAccountBalance(accountChange.getAccountNo(), accountChange.getAmount()); //插入事务记录,用于幂等控制 accountInfoMapper.addTx(accountChange.getTxNo()); } /** * 远程调用查询充值结果 * @param tx_no * @return */ public AccountPay queryPayResult(String tx_no) { //远程调用 AccountPay payresult = payClient.payresult(tx_no); if("success".equals(payresult.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(payresult.getAccountNo());//账号 accountChangeEvent.setAmount(payresult.getPayAmount());//金额 accountChangeEvent.setTxNo(payresult.getId());//充值事务号 updateAccountBalance(accountChangeEvent); } return payresult; } }

Consumer

@Component @Slf4j @RocketMQMessageListener(topic = "topic_notifymsg",consumerGroup = "consumer_group_notifymsg_bank1") public class NotifyMsgConsumer implements RocketMQListener<AccountPay> { @Autowired private AccountInfoService accountInfoService; @Override public void onMessage(AccountPay accountPay) { log.info("接收到消息:{}", JSON.toJSONString(accountPay)); if("success".equals(accountPay.getResult())){ //更新账户金额 AccountChangeEvent accountChangeEvent = new AccountChangeEvent(); accountChangeEvent.setAccountNo(accountPay.getAccountNo()); accountChangeEvent.setAmount(accountPay.getPayAmount()); accountChangeEvent.setTxNo(accountPay.getId()); accountInfoService.updateAccountBalance(accountChangeEvent); } log.info("处理消息完成:{}", JSON.toJSONString(accountPay)); } }

Feign

@FeignClient(value = "notify-msg-pay") public interface PayClient { //远程调用充值系统的接口查询充值结果 @GetMapping(value = "account/payResult/{txNo}") public AccountPay payresult(@PathVariable("txNo") String txNo); }

Mapper

public interface AccountInfoMapper extends Mapper<AccountInfo> { //修改账户金额 int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Long amount); //查询幂等记录,用于幂等控制 int isExistTx(String txNo); //添加事务记录,用于幂等控制 int addTx(String txNo); } <mapper namespace="com.yibo.notify.mapper.AccountInfoMapper"> <resultMap id="BaseResultMap" type="com.yibo.notify.domain.entity.AccountInfo"> <!-- WARNING - @mbg.generated --> <id column="id" jdbcType="BIGINT" property="id" /> <result column="account_name" jdbcType="VARCHAR" property="accountName" /> <result column="account_no" jdbcType="VARCHAR" property="accountNo" /> <result column="account_password" jdbcType="VARCHAR" property="accountPassword" /> <result column="account_balance" jdbcType="BIGINT" property="accountBalance" /> </resultMap> <!-- 修改账户金额 --> <update id="updateAccountBalance"> update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo} </update> <!-- 查询幂等记录,用于幂等控制 --> <select id="isExistTx"> select count(1) from de_duplication where tx_no = #{txNo} </select> <!-- 添加事务记录,用于幂等控制 --> <insert id="addTx"> insert into de_duplication values(#{txNo},now()) </insert> </mapper>

总结

最大努力通知方案是分布式事务中对一致性要求最低的一种,适用于一些最终一致性时间敏感度低的业务。

最大努力通知方案需要实现如下功能:

  • 1、消息重复通知机制。
  • 2、消息校对机制。

github源码地址:github.com/jjhyb/distributed-transaction

参考: www.cnblogs.com/zeussbook/p/11799017.html