Go项目(rocketmq)
创始人
2025-06-01 11:20:37
0

文章目录

  • 简介
  • 场景
  • 技术选型
  • rocketmq
    • 概念
    • 消息类型
  • go-client
  • 集成
    • CreateOrder
    • inventory
    • 库存归还

简介

  • 这篇简单介绍下 mq 的应用场景和 racketmq 技术

场景

  • 消息队列是一种“先进先出”的数据结构,应用场景主要包含以下3个方面
  • 一、应用解耦
    • 系统的耦合性越高,容错性就越低
    • 以电商应用为例,用户创建订单后,如果耦合调用(接连调用各服务并等待返回状态码)库存系统、物流系统、支付系统,任何一个子系统出了故障或者因为升级等原因暂时不可用,都会造成下单操作异常,影响用户使用体验
    • 使用消息队列解耦,即使某个系统发生故障,需要几分钟才能修复,但这并不影响订单服务
  • 二、流量削峰
    • 应用系统如果遇到请求流量的瞬间猛增,有可能会将系统压垮
    • 有了消息队列可以将大量请求缓存起来,分散到很长一段时间处理,这样可以大大提到系统的稳定性和用户体验
  • 三、数据分发
    • Producer 不需要关心谁来使用数据,只需要将数据发送到消息队列,Consumer 直接在消息队列中直接获取数据即可
    • 比如某个系统下线了(维护),我并不需要着急改 A 的逻辑,还是照常产生消息到 MQ,消不消费是 D 的问题
      1
  • 总而言之,有了 MQ,系统之间松散了,干起活来不用那么着急了
  • 当然,并不是没有缺点
    • 系统可用性降低:引入的外部依赖越多,系统稳定性越差;一旦MQ宕机,就会对业务造成影响。如何保证MQ的高可用?
    • 系统复杂度提高:MQ的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过MQ进行异步调用;如何保证消息没有被重复消费?怎么处理消息丢失情况?怎么保证消息传递的顺序性?
    • 一致性问题:A 系统处理完业务,通过MQ给 B、C、D 三个系统发消息数据,如果B系统、C系统处理成功,D系处理失败,如何保证消息处理的一致性?(分布式事务)

技术选型

  • 常见的 MQ 技术
    1
    2
  • 中小型软件公司,建议选 RabbitMQ,一方面,erlang语言天生具备高并发的特性,而且他的管理界面用起来十分方便;另一方面,中小型软件公司数据量没那么大,选消息中间件,应首选功能比较完备的,所以 kafka 排除(如果只是做日志,闭眼睛选这个);但是,虽然 RabbitMQ 是开源的,然而国内少有开发 erlang 的程序员,所幸它的社区十分活跃,可以解决开发过程中常见的 bug
  • 大型软件公司,根据具体使用在 rocketmq 和 kafka 之间二选一;一方面,大型软件公司,具备足够的资金搭建分布式环境,也具备足够大的数据量;针对 rocketMQ,大型软件公司也可以抽出人力进行定制化开发,毕竟国内有能力改 JAVA 源码的人,还是相当多的
  • 我们这里选择 rocketmq,因为它对分布式架构的支持很好,能极大提高可用性
    • MQ 技术学哪个都一样,因为功能是类似的,很容易转换
    • 阿里的有些技术会放弃维护,但 rocketmq 已经交给 Apache 管理,所以不必担心,未来发展趋势看好

rocketmq

  • 安装
    • 准备了 docker compose 脚本,避免环境问题
      version: '3.5'
      services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
      
    • rmqconsole 类似 kibana,访问 http://ip:8080 可以通过界面管理 MQ
    • 要挂载的 conf 目录,broker.conf;brokerIP1 一定要改成自己虚拟机的 IP
      # Licensed to the Apache Software Foundation (ASF) under one or more
      # contributor license agreements.  See the NOTICE file distributed with
      # this work for additional information regarding copyright ownership.
      # The ASF licenses this file to You under the Apache License, Version 2.0
      # (the "License"); you may not use this file except in compliance with
      # the License.  You may obtain a copy of the License at
      #
      #     http://www.apache.org/licenses/LICENSE-2.0
      #
      #  Unless required by applicable law or agreed to in writing, software
      #  distributed under the License is distributed on an "AS IS" BASIS,
      #  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
      #  See the License for the specific language governing permissions and
      #  limitations under the License.# 所属集群名字
      brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
      # 在 broker-b.properties 使用: broker-b
      brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
      brokerId=0# nameServer地址,分号分割
      # namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
      # 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
      brokerIP1=192168.109.128# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
      defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
      autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
      autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
      listenPort=10911# 删除文件时间点,默认凌晨4点
      deleteWhen=04# 文件保留时间,默认48小时
      fileReservedTime=120# commitLog 每个文件的大小默认1G
      mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
      mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
      # redeleteHangedFileInterval=120000
      # 检测物理文件磁盘空间
      diskMaxUsedSpaceRatio=88
      # 存储路径
      # storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
      # commitLog 存储路径
      # storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
      # 消费队列存储
      # storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
      # 消息索引存储路径
      # storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
      # checkpoint 文件存储路径
      # storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
      # abort 文件存储路径
      # abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
      # 限制的消息大小
      maxMessageSize=65536# flushCommitLogLeastPages=4
      # flushConsumeQueueLeastPages=2
      # flushCommitLogThoroughInterval=10000
      # flushConsumeQueueThoroughInterval=60000# Broker 的角色
      # - ASYNC_MASTER 异步复制Master
      # - SYNC_MASTER 同步双写Master
      # - SLAVE
      brokerRole=ASYNC_MASTER# 刷盘方式
      # - ASYNC_FLUSH 异步刷盘
      # - SYNC_FLUSH 同步刷盘
      flushDiskType=ASYNC_FLUSH# 发消息线程池数量
      # sendMessageThreadPoolNums=128
      # 拉消息线程池数量
      # pullMessageThreadPoolNums=128
      
    • 启动成功后可以先体验一下发送和消费消息,这里都是基于 topic 的
      1
    • 安装上面的做法启动如果还报 Java 异常,先 Ctrl-C 重启一下试试,别着急查

概念

  • Producer:消息的发送者;举例:发信者
  • Consumer:消息接收者;举例:收信者
  • Broker:暂存和传输消息(数据是存在这的);举例:邮局
  • NameServer:管理Broker;举例:各个邮局的管理机构
  • Topic:区分消息的种类(主题);一个发送者可以发送消息给一个或者多个 Topic;一个消息的接收者可以订阅一个或者多个 Topic 消息
  • 这张图展示了各组件的关系,broker 相当于服务器
    2

消息类型

  • 按照发送特点分
    • 同步发送
      • 同步发送,线程阻塞,投递 completes 后阻塞结束
      • 如果发送失败,会在默认的超时时间3秒内进行重试,最多重试2次
      • 投递 completes 不代表发送成功,要 check SendResult.sendStatus 来判断是否投递成功
      • SendStatus 里面有发送状态的枚举,同步的消息投递会有一个状态返回值
        public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
        }
        
      • 注:发送同步消息且 Ack 为 SEND_OK,只代表该消息成功写入了 MQ 中,并不代表该消息被Consumer 消费了
    • 异步发送
      • 异步调用,当前线程一定要等待异步线程回调结束再关闭 Producer,因为是异步的,不会阻塞,提前关闭 producer 会导致未回调链接就断开了
      • 异步消息不 retry,投递失败回调 onException() 方法,只有同步消息才会 retry,源码参考DefaultMQProducerImpl.class
      • 异步发送一般用于链路耗时较长,对 RT 响应时间较为敏感的业务场景,例如用户视频上传后启动转码服务,转码完成后发送转码结果(向MQ发送)等,不必一直等待转码结果是否发送成功,会有回调链,保证消息已发送(不丢失)
    • 单向发送
      • 消息不可靠,性能高,只负责往服务器(broker)发送一条消息,不会重试也不关心是否发送成功
      • 此方式发送消息的过程耗时非常短,一般在微秒级别
    • 概括对比一下
      1
  • 按照使用功能特点分
    • 或者说按照使用场景分、按照需求分
    • 普通消息
      • 普通消息是我们在业务开发中用到的最多的消息类型,生产者需要关注消息发送成功即可,消费者消费到消息即可,不需要保证消息的顺序,所以消息可以大规模并发地发送和消费,吞吐量很高,适合大部分场景
      • 每个 broker 都相当于一个 queue,broker 内保证 FIFO,但一个 topic 可能在多个 broker 上,所以从整体来看是默认无序的
    • 顺序消息
      • 分为分区顺序消息和全局顺序消息,全局顺序消息比较容易理解,也就是哪条消息先进入,哪条消息就会先被消费,符合我们的 FIFO,但很多时候全局消息的实现代价很大,所以就出现了分区顺序消息
      • 分区顺序消息的概念可以如下图所示
        2
      • 通过对消息的 key 进行 hash,相同 hash 的消息会被分配到同一个分区里面,当然如果要做全局顺序消息,我们的分区只需要一个即可,所以全局顺序消息的代价是比较大的
    • 延时消息
      • 延迟的机制是在服务端(broker)实现的,也就是 Broker 收到了消息,但是经过一段时间以后才发送(才能被消费);有 push/pull 两种消费方式
      • 服务器按照 1-N 定义了如下级别:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
      • 若要发送定时消息,在应用层初始化 Message 消息对象之后(这都是broker里的操作,producer只需要传参),调用Message.setDelayTimeLevel(intlevel) 方法来设置延迟级别,按照序列取相应的延迟级别,例如 level=2,则延迟为 5s
      • 发送消息的时候如果消息设置了 DelayTimeLevel,该消息会被丢到ScheduleMessageService.SCHEDULE_TOPIC 这个 Topic 里面
      • 根据 DelayTimeLevel 选择对应的 queue(broker),不需要每个 broker 都有延时各种 level 的能力
    • 事务消息
      • 看看官方文档吧,上面说的这几种消息类型都有介绍
  • 上面只是从不同角度划分了消息种类,但彼此之间是有交集的,比如某个 topic 下的这条消息可以是异步发送的普通消息,我们只需要关注自己的需求选择合适的方式,不必太关注类型划分

go-client

  • go 操作 rocketmq 的客户端
  • 同步发送 SendSync,普通消息;这里涉及一个重要的概念:NameServer
    package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
    )func main() {// rmqnamesrv 的端口 9876p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}res, err := p.SendSync(context.Background(), primitive.NewMessage("vshop", []byte("this is Roy!")))if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}}
    
    1
    • 注:会返回一个 ID msgIds=C0A87E022710000000005a1e38900001
  • 消费/订阅消息(push)
    • 这里要设置 groupname,因为我们是 consumer 集群,其中一台机器消费了消息,其他就不需要再消费,这就需要这组机器加入到同一个 group,相同 group 的机器再来只会消费下一个消息
      2
    • 只推送一份消息不就行了吗???
    • 订阅 NewPushConsumer
      package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
      )func main() {c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.109.128:9876"}),consumer.WithGroupName("shop"),)if err := c.Subscribe("vshop", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值: %v \n", msgs[i])// Message=[topic=vshop, body=this is Roy!, Flag=0, properties=map[CONSUME_START_TIME:1679116202537 MAX_OFFSET:1 MIN_OFFSE//T:0 UNIQ_KEY:C0A87E022710000000005a1e38900001], TransactionId=]}return consumer.ConsumeSuccess, nil}); err != nil {fmt.Println("读取消息失败")}_ = c.Start()// 不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
      }
      
    • 通过主线程 sleep 让 consumer 不退出,只要给这个 topic push 了消息,这边就能收到
    • 被消费了的消息还是能看到,detail 中会看到 consumerGroup
      3
  • 发送延时消息
    • 可以启动 consumer 观察是否延时了;注:topic 要对应上,一切基于 topic
      package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
      )func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}msg := primitive.NewMessage("vshop", []byte("this is a delay message3"))// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.WithDelayTimeLevel(4)res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}//使用场景:支付的时候, 淘宝, 12306, 购票, 超时归还 - 定时执行逻辑//我可以去写一个轮询, 轮询的问题: 1. 多久执行一次轮询 30分钟//在12:00执行过一次, 下一次执行就是在 12:30的时候 但是12:01的时候下了单, 12:31就应该超时 但现在13:00时候才能超时//那我1分钟执行一次, 比如我的订单量没有这么大,1分钟执行一次, 其中29次查询都是无用, 而且你还还会轮询mysql//rocketmq的延迟消息, 1. 时间一到就执行, 2. 消息中包含了订单编号,你只查询这种订单编号
      }
      
    • 后续完善订单服务的超时归还库存要使用延时消息
  • 发送事务消息
    • 底层原理就是上一篇中提到的基于可靠消息的最终一致性方案,可以先回顾一下
    • SendMessageInTransaction 会发送半消息,然后阻塞等待本地事务的结果
    • 这里模拟了三种返回结果,注意理解
      package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
      )type OrderListener struct{}func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {fmt.Println("开始执行本地逻辑")time.Sleep(time.Second * 10)fmt.Println("执行本地事务结束")// 1.执行本地事务成功// return primitive.CommitMessageState // expect:成功发送commit消息,控制台能看到消息// 2.执行本地事务失败// return primitive.RollbackMessageState 	// expect:成功发送rollback消息,half message被删除,控制台看不到消息// 3.本地执行逻辑无缘无故失败 代码异常 宕机fmt.Println("执行本地逻辑失败")return primitive.UnknowState // 模拟不返回状态;等待一定时间后,开始 check
      }// 即使服务挂掉,下次启动后还是会接着回查,严谨
      // 因为 MQ 里存着 half message,没有 commit,也没有 rollback,于是就会拿着 ID 回查对应事务
      func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {fmt.Println("本地事务状态未知,rocketmq 回查")time.Sleep(time.Second * 5)return primitive.CommitMessageState // 回查本地事务发现已成功执行,可以 commit
      }func main() {p, err := rocketmq.NewTransactionProducer(&OrderListener{},producer.WithNameServer([]string{"192.168.109.128:9876"}),)if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}// 底层先发送了 half message;success后再去执行本地事务 ExecuteLocalTransaction,// 等本地事务结束// 		1. 根据返回状态发送 commit/rollback//		2. 一直没返回状态,调用 CheckLocalTransaction 检测本地事务的状态,再次 commit/rollback// 这里会阻塞,情况1时,阻塞到已发送完整消息/已删除半消息,情况2时,阻塞到本地事务返回 UnknowState,剩下的交给回查// 具体细节还需要看源码res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TransTopic", []byte("this is successful transaction message1")))fmt.Println("state:", res.State) // 1 2 3if err != nil {fmt.Printf("返回状态失败: %s\n", err)} else {fmt.Printf("返回状态成功: %s\n", res.String())}time.Sleep(time.Hour)if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
      }
      
    • 后续会用到事务消息实现分布式事务
    • 这里要保证订阅者消费了消息(响应了请求),具体怎么实现后续会介绍
  • Tips:任何系统,日志都需要单独设计并测试各种可能情况,是排查问题的关键

集成

  • 接下来将分布式事务和超时退还机制集成到订单微服务,在实践中深化理解
  • 先设计大致的架构,考虑库存扣减中可能遇到的情况,或者说分析数据不一致的可能原因
    • 在订单服务本地,要操作:新建订单信息、新建订单商品信息、删除购物车记录,这部分可以放在 MySQL 本地事务保证数据一致性
    • 扣减库存成功
      • 本地事务失败,能否在事务 rollback 中调用库存归还呢?不可以,服务挂掉、服务器宕机、本地代码异常等会导致无法执行到归还库存
    • 扣减库存失败,直接回滚本地事务,但这就要把调用库存服务放在事务中,如果遇到网络拥塞或库存服务宕机,重试次数达到上限,本地事务无法执行成功
    • 这部分就是把服务挂掉、网络拥塞、服务器宕机、本地代码异常这些问题尝试放到各个步骤中,看架构能否解决问题或存在哪些不足
  • 尝试用上一篇中介绍的分布式事务方案解决问题
    • tcc(虽然合适,但实现较为复杂,go语言还没有完善的框架)
      1
    • 基于可靠消息的最终一致性方案
      • 前面说过,需要保证订阅者消费了消息,因为本地 commit 后不能再回滚,库存服务必须执行成功
        2
      • 一般情况下,可以通过解决代码 bug 保证消息被消费,即使宕机,启动后还是可以继续消费
      • 但是库存服务特殊,因为有库存不足的情况!或者概括为:这个方案对于资源有限制的场景是需要改造的,不能直接套用
    • 改进
      • 只需调整一下业务逻辑,如图,先发送半消息准备归还库存,再调用库存服务(扣减),如果库存不足,后续的本地事务不会开始,从而解决了资源有限制的问题
        3
      • 如果本地事务执行成功,则 rollback 调 reback 的 half message;如果失败,就 commit,执行库存归还
      • 同样有回查,如果能查到已支付订单(或未超时),说明不需要库存归还(rollback),如果没有订单信息(或已超时),commit 归还库存
      • 这里还应该发送延时消息,也就是实现超时归还机制;但对同一订单,库存服务只能消费归还消息(回查commit的)或延时消息(同一时刻出现这两个commit的几率不大)
    • 接下来具体实现这一改进方案

CreateOrder

  • 在 order_srv/handler/order.go 函数 CreateOrder 中发送事务消息
    // 更新订单表
    order := model.OrderInfo{OrderSn:      GenerateOrderSn(req.UserId), // 一定要放在这里生成,归还逻辑会用到Address:      req.Address,SignerName:   req.Name,SingerMobile: req.Mobile,Post:         req.Post,User:         req.UserId,
    }
    // 应该在消息中具体指明一个订单的具体的商品的扣减情况
    jsonString, _ := json.Marshal(order)
    // 1. 先发送库存归还半消息,再调用 ExecuteLocalTransaction
    _, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString))
    if err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")
    }
    if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)
    }return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
    
  • 这个函数的大部分逻辑移到 ExecuteLocalTransaction
  • 还需要实现指向结构体 OrderListener 的另一个方法 CheckLocalTransaction,用于回查
    • Q:查到订单号,也应该查看订单状态呀,可能是未支付未超时,直接回滚岂不是没机会归还了?
    • 后续添加发送延时消息的逻辑,到时再梳理逻辑
  • 这部分需要看代码仔细理解,并结合前面的理论知识,比较考验逻辑思维

inventory

  • 库存服务部分 inventory_srv/handler/inventory.go
  • 首先,需要在 main.go 订阅订单服务生成的 topic
    if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {fmt.Println("读取消息失败")
    }
    
  • 因为逻辑较为复杂,库存归还函数 AutoReback 放在 handler 中单独定义,重点是解决重复归还的问题
  • 主要是保证归还接口的幂等性,方案是:新建一张表, 这张表记录了详细的订单扣减细节,以及归还细节
  • 在 model 设计并生成表
    // 便于返回商品详情
    type GoodsDetail struct {Goods int32Num   int32
    }
    type GoodsDetailList []GoodsDetailtype StockSellDetail struct {OrderSn string          `gorm:"type:varchar(200);index:idx_order_sn,unique;"`	// 要指定索引名称Status  int32           `gorm:"type:varchar(200)"` //1 表示已扣减 2. 表示已归还Detail  GoodsDetailList `gorm:"type:varchar(200)"`
    }func (StockSellDetail) TableName() string {return "stockselldetail"
    }
    
  • 测试表
    package mainimport ("crypto/md5""encoding/hex""fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger""gorm.io/gorm/schema""io""log""shop_srvs/inventory_srv/model""os""time"
    )func genMd5(code string) string{Md5 := md5.New()_, _ = io.WriteString(Md5, code)return hex.EncodeToString(Md5.Sum(nil))
    }func main() {dsn := "root:root@tcp(192.168.109.128:3306)/shop_inventory_srv?charset=utf8mb4&parseTime=True&loc=Local"newLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), // io writerlogger.Config{SlowThreshold: time.Second,   // 慢 SQL 阈值LogLevel:      logger.Info, // Log levelColorful:      true,         // 禁用彩色打印},)// 全局模式db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{NamingStrategy: schema.NamingStrategy{SingularTable: true,},Logger: newLogger,})if err != nil {panic(err)}//_ = db.AutoMigrate(&model.Inventory{}, &model.StockSellDetail{})//插入一条数据//orderDetail := model.StockSellDetail{//	OrderSn: "imooc-bobby",//	Status:  1,//	Detail:  []model.GoodsDetail{{1,2},{2,3}},//}//db.Create(&orderDetail)var sellDetail model.StockSellDetaildb.Where(model.StockSellDetail{OrderSn:"vshop-roy"}).First(&sellDetail)fmt.Println(sellDetail.Detail)
    }
    
  • 接下来定义自动归还的逻辑
    • Q:consumer.ConsumeRetryLater 的底层逻辑是怎样的?如何重新消费这条消息?
    • 推理:应该是收集 return 了 ConsumeRetryLater 的消息,重新调用 AutoReback
  • 测试修改的代码
    • 让本地事务执行出错(返回 codes.Internal),测试库存归还,通过 UT 也可以完成,但是需要 mock,会麻烦一些
    • 让本地事务返回 primitive.UnknowState,测试回查函数;注:如果我们的业务逻辑正常,回查函数一般只会在极端情况下才会用到
    • Q:未知状态包括哪些?本地事务执行时间过长是否会触发回查?还是说必须返回一个异常?可能不是最后 return 的,而是本地事务的代码出问题,ExecuteLocalTransaction 报错,也会回查
    • 触发回查后还会等待一段时间,才执行 CheckLocalTransaction

库存归还

  • 还是放在本地事务的代码中,发送延时消息
  • 这里要判断,可能 30min 内已支付,在 main 监听 topic,取消订单逻辑放在 order/handler
    • 操作订单表 TRADE_CLOSED
    • 归还库存,直接给 order_reback 这个 topic 发消息就行
    • 那购物车表、商品订单表怎么归还呢?TODO
  • 这里要注意,不要使用 p.Shutdown 关闭 Producer,会报错
    • 通过 NewPushConsumer 追踪源码,可以看到里面有个函数 GetOrNewRocketMQClient
    • 通过 NewProducer/NewTransactionProducer,还是用了这个函数
    • 源码用的 clientMap 是一个协程同步的 map,传入的 ClientID 可以看到是进程号
      func (c *rmqClient) ClientID() string {id := c.option.ClientIP + "@"if c.option.InstanceName == "DEFAULT" {id += strconv.Itoa(os.Getpid())} else {id += c.option.InstanceName}if c.option.UnitName != "" {id += "@" + c.option.UnitName}return id
      }
      
    • 也就是说,任何地方创建的 C/P,因为没有指定 ID,都是进程号,这个 Map 就会 Load 已经存在的这个 client,只要在任意地方 Shutdown 了,都会导致全部的 C/P 关闭
      • 也就是说,一个 client 下可以有多个 C/P
      • 这种实现便于统一管理,也有应用场景
    • 我们这里解决报错的办法有两个:1.不用 Shutdown,2.在新建时,options 中指定 ClientID
  • 项目代码方面,这章主要是给实现分布式事务的 code 加了些注释
    • git commit

相关内容

热门资讯

婚礼司仪主持的台词大全 婚礼司... 下面有请今天的双方主婚人到前面就座  有请咱们今天亲爱的娘家客人到前面就座  有请证婚人、介绍人到前...
最新或2023(历届)春节放假...  春节是中国最大的传统节日,在外拼搏了一年的人们都从各地赶回家里与亲人团圆。短短的团聚时光,聊着这一...
《为了这片土地》电影观后感 为...   【篇一】  《为了这片土地》是一部主旋律电影,讲述了王桂兰三十多年如一日扎根农村,以改变家乡贫穷...
银行合规警示教育观后感(精选)... 【篇一】  在国有商业银行的改革和发展取得显著成绩的同时,基层机构案件濒发的问题却始终没有得到有效的...
影片《作风建设永远在路上》观后... 【篇一】  由中纪委宣传部与央视联合摄制的专题片《作风建设永远在路上》,在央视黄金时段强档推出。12...
最新或2023(历届)公司联欢...  男:尊敬的各位领导、各位来宾,  女:亲爱的同事们  合:大家下午好!  男:光阴似箭,岁月如梭,...
最新或2023(历届)度表彰大...  各位领导,各位同事,大家下午好:  今天我们欢聚一堂,在这里隆重召开一年一度的总结表彰大会。召开大...
年度表彰大会主持词大全 公司年...  各位领导,各位同事,大家下午好:  今天我们欢聚一堂,在这里隆重召开一年一度的总结表彰大会。召开大...
最新或2023(历届)六一儿童... 六一儿童节主持词开场白:  主持人甲:弹去五月的风尘,迎来六月的阳光。  主持人乙:我们的心儿像怒放...
最新或2023(历届)公司知识...  尊敬的各位领导、各位来宾、参赛选手们:大家晚上好!  还有两天就是我们的传统节日——中秋节了。我们...
最新或2023(历届)组织工作... 同志们:  现在开会。  这次会议的主要任务是,传达全市组织工作会议精神,总结专年工作,部署今年任务...
最新或2023(历届)大学毕业... 尊敬的各位领导,各位同学:  你们好!(敬礼,或是鞠躬)  昨天,你们怀着希望和期待,迈进了xx学院...
最新或2023(历届)消夏晚会... 尊敬的各位领导,各位来宾:  乙:xx村的父老乡亲们  合:大家好!  甲:又是一个郁郁葱葱的盛夏,...
公司知识竞赛主持词范文 党史知...  尊敬的各位领导、各位来宾、参赛选手们:大家晚上好!  还有两天就是我们的传统节日——中秋节了。我们...
老师工作失职检讨书范文 说老师... 一、我的专业素质太低。  我在大学所学的内容只是本专业的内容,但却没有想到,在登上讲台以后,我还要面...
最新或2023(历届)婚礼司仪...  红杏枝头春意闹,玉栏桥上伊人来,身披着洁白的婚纱,头上戴着美丽的鲜花,沐浴在幸福甜蜜之中的佳人在庄...
最新或2023(历届)给老婆的... 最新或2023(历届)给老婆的喝酒检讨书一:  亲爱的老婆大人:  关于我昨晚喝醉酒的事情,现在向您...
给女朋友写认错检讨书范文 给女... 给女朋友写认错检讨书一:  爱的吾爱的亲爱的挚爱的永爱的最爱的全宇宙超级无敌乖乖女—  我在电脑前苦...
员工工作违纪检讨书范文(最新或... 篇一  尊敬的领导:  xx月xx号,我没有与本班人员协调好换班使得岗位主操人员都没到岗上班。今天,...
违规发放津补贴检讨书范文 违规... 违规发放津补贴检讨书一:  根据x财字【201x】21号文件精神和主管部门有关要求,我校接到通知后,...