version: '3.5'
services:rmqnamesrv:image: foxiswho/rocketmq:servercontainer_name: rmqnamesrvports:- 9876:9876volumes:- ./logs:/opt/logs- ./store:/opt/storenetworks:rmq:aliases:- rmqnamesrvrmqbroker:image: foxiswho/rocketmq:brokercontainer_name: rmqbrokerports:- 10909:10909- 10911:10911volumes:- ./logs:/opt/logs- ./store:/opt/store- ./conf/broker.conf:/etc/rocketmq/broker.confenvironment:NAMESRV_ADDR: "rmqnamesrv:9876"JAVA_OPTS: " -Duser.home=/opt"JAVA_OPT_EXT: "-server -Xms256m -Xmx256m -Xmn256m"command: mqbroker -c /etc/rocketmq/broker.confdepends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqbrokerrmqconsole:image: styletang/rocketmq-console-ngcontainer_name: rmqconsoleports:- 8080:8080environment:JAVA_OPTS: "-Drocketmq.namesrv.addr=rmqnamesrv:9876 -Dcom.rocketmq.sendMessageWithVIPChannel=false"depends_on:- rmqnamesrvnetworks:rmq:aliases:- rmqconsolenetworks:rmq:name: rmqdriver: bridge
rmqconsole
类似 kibana,访问 http://ip:8080 可以通过界面管理 MQbrokerIP1
一定要改成自己虚拟机的 IP# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.# 所属集群名字
brokerClusterName=DefaultCluster# broker 名字,注意此处不同的配置文件填写的不一样,如果在 broker-a.properties 使用: broker-a,
# 在 broker-b.properties 使用: broker-b
brokerName=broker-a# 0 表示 Master,> 0 表示 Slave
brokerId=0# nameServer地址,分号分割
# namesrvAddr=rocketmq-nameserver1:9876;rocketmq-nameserver2:9876# 启动IP,如果 docker 报 com.alibaba.rocketmq.remoting.exception.RemotingConnectException: connect to <192.168.0.120:10909> failed
# 解决方式1 加上一句 producer.setVipChannelEnabled(false);,解决方式2 brokerIP1 设置宿主机IP,不要使用docker 内部IP
brokerIP1=192168.109.128# 在发送消息时,自动创建服务器不存在的topic,默认创建的队列数
defaultTopicQueueNums=4# 是否允许 Broker 自动创建 Topic,建议线下开启,线上关闭 !!!这里仔细看是 false,false,false
autoCreateTopicEnable=true# 是否允许 Broker 自动创建订阅组,建议线下开启,线上关闭
autoCreateSubscriptionGroup=true# Broker 对外服务的监听端口
listenPort=10911# 删除文件时间点,默认凌晨4点
deleteWhen=04# 文件保留时间,默认48小时
fileReservedTime=120# commitLog 每个文件的大小默认1G
mapedFileSizeCommitLog=1073741824# ConsumeQueue 每个文件默认存 30W 条,根据业务情况调整
mapedFileSizeConsumeQueue=300000# destroyMapedFileIntervalForcibly=120000
# redeleteHangedFileInterval=120000
# 检测物理文件磁盘空间
diskMaxUsedSpaceRatio=88
# 存储路径
# storePathRootDir=/home/ztztdata/rocketmq-all-4.1.0-incubating/store
# commitLog 存储路径
# storePathCommitLog=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/commitlog
# 消费队列存储
# storePathConsumeQueue=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/consumequeue
# 消息索引存储路径
# storePathIndex=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/index
# checkpoint 文件存储路径
# storeCheckpoint=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/checkpoint
# abort 文件存储路径
# abortFile=/home/ztztdata/rocketmq-all-4.1.0-incubating/store/abort
# 限制的消息大小
maxMessageSize=65536# flushCommitLogLeastPages=4
# flushConsumeQueueLeastPages=2
# flushCommitLogThoroughInterval=10000
# flushConsumeQueueThoroughInterval=60000# Broker 的角色
# - ASYNC_MASTER 异步复制Master
# - SYNC_MASTER 同步双写Master
# - SLAVE
brokerRole=ASYNC_MASTER# 刷盘方式
# - ASYNC_FLUSH 异步刷盘
# - SYNC_FLUSH 同步刷盘
flushDiskType=ASYNC_FLUSH# 发消息线程池数量
# sendMessageThreadPoolNums=128
# 拉消息线程池数量
# pullMessageThreadPoolNums=128
public enum SendStatus {SEND_OK,FLUSH_DISK_TIMEOUT,FLUSH_SLAVE_TIMEOUT,SLAVE_NOT_AVAILABLE,
}
SCHEDULE_TOPIC
这个 Topic 里面SendSync
,普通消息;这里涉及一个重要的概念:NameServerpackage mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {// rmqnamesrv 的端口 9876p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}res, err := p.SendSync(context.Background(), primitive.NewMessage("vshop", []byte("this is Roy!")))if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}}
msgIds=C0A87E022710000000005a1e38900001
groupname
,因为我们是 consumer 集群,其中一台机器消费了消息,其他就不需要再消费,这就需要这组机器加入到同一个 group,相同 group 的机器再来只会消费下一个消息NewPushConsumer
package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/consumer""github.com/apache/rocketmq-client-go/v2/primitive"
)func main() {c, _ := rocketmq.NewPushConsumer(consumer.WithNameServer([]string{"192.168.109.128:9876"}),consumer.WithGroupName("shop"),)if err := c.Subscribe("vshop", consumer.MessageSelector{}, func(ctx context.Context, msgs ...*primitive.MessageExt) (consumer.ConsumeResult, error) {for i := range msgs {fmt.Printf("获取到值: %v \n", msgs[i])// Message=[topic=vshop, body=this is Roy!, Flag=0, properties=map[CONSUME_START_TIME:1679116202537 MAX_OFFSET:1 MIN_OFFSE//T:0 UNIQ_KEY:C0A87E022710000000005a1e38900001], TransactionId=]}return consumer.ConsumeSuccess, nil}); err != nil {fmt.Println("读取消息失败")}_ = c.Start()// 不能让主goroutine退出time.Sleep(time.Hour)_ = c.Shutdown()
}
package mainimport ("context""fmt""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)func main() {p, err := rocketmq.NewProducer(producer.WithNameServer([]string{"192.168.109.128:9876"}))if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}msg := primitive.NewMessage("vshop", []byte("this is a delay message3"))// 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hmsg.WithDelayTimeLevel(4)res, err := p.SendSync(context.Background(), msg)if err != nil {fmt.Printf("发送失败: %s\n", err)} else {fmt.Printf("发送成功: %s\n", res.String())}if err = p.Shutdown(); err != nil {panic("关闭producer失败")}//使用场景:支付的时候, 淘宝, 12306, 购票, 超时归还 - 定时执行逻辑//我可以去写一个轮询, 轮询的问题: 1. 多久执行一次轮询 30分钟//在12:00执行过一次, 下一次执行就是在 12:30的时候 但是12:01的时候下了单, 12:31就应该超时 但现在13:00时候才能超时//那我1分钟执行一次, 比如我的订单量没有这么大,1分钟执行一次, 其中29次查询都是无用, 而且你还还会轮询mysql//rocketmq的延迟消息, 1. 时间一到就执行, 2. 消息中包含了订单编号,你只查询这种订单编号
}
package mainimport ("context""fmt""time""github.com/apache/rocketmq-client-go/v2""github.com/apache/rocketmq-client-go/v2/primitive""github.com/apache/rocketmq-client-go/v2/producer"
)type OrderListener struct{}func (o *OrderListener) ExecuteLocalTransaction(msg *primitive.Message) primitive.LocalTransactionState {fmt.Println("开始执行本地逻辑")time.Sleep(time.Second * 10)fmt.Println("执行本地事务结束")// 1.执行本地事务成功// return primitive.CommitMessageState // expect:成功发送commit消息,控制台能看到消息// 2.执行本地事务失败// return primitive.RollbackMessageState // expect:成功发送rollback消息,half message被删除,控制台看不到消息// 3.本地执行逻辑无缘无故失败 代码异常 宕机fmt.Println("执行本地逻辑失败")return primitive.UnknowState // 模拟不返回状态;等待一定时间后,开始 check
}// 即使服务挂掉,下次启动后还是会接着回查,严谨
// 因为 MQ 里存着 half message,没有 commit,也没有 rollback,于是就会拿着 ID 回查对应事务
func (o *OrderListener) CheckLocalTransaction(msg *primitive.MessageExt) primitive.LocalTransactionState {fmt.Println("本地事务状态未知,rocketmq 回查")time.Sleep(time.Second * 5)return primitive.CommitMessageState // 回查本地事务发现已成功执行,可以 commit
}func main() {p, err := rocketmq.NewTransactionProducer(&OrderListener{},producer.WithNameServer([]string{"192.168.109.128:9876"}),)if err != nil {panic("生成producer失败")}if err = p.Start(); err != nil {panic("启动producer失败")}// 底层先发送了 half message;success后再去执行本地事务 ExecuteLocalTransaction,// 等本地事务结束// 1. 根据返回状态发送 commit/rollback// 2. 一直没返回状态,调用 CheckLocalTransaction 检测本地事务的状态,再次 commit/rollback// 这里会阻塞,情况1时,阻塞到已发送完整消息/已删除半消息,情况2时,阻塞到本地事务返回 UnknowState,剩下的交给回查// 具体细节还需要看源码res, err := p.SendMessageInTransaction(context.Background(), primitive.NewMessage("TransTopic", []byte("this is successful transaction message1")))fmt.Println("state:", res.State) // 1 2 3if err != nil {fmt.Printf("返回状态失败: %s\n", err)} else {fmt.Printf("返回状态成功: %s\n", res.String())}time.Sleep(time.Hour)if err = p.Shutdown(); err != nil {panic("关闭producer失败")}
}
// 更新订单表
order := model.OrderInfo{OrderSn: GenerateOrderSn(req.UserId), // 一定要放在这里生成,归还逻辑会用到Address: req.Address,SignerName: req.Name,SingerMobile: req.Mobile,Post: req.Post,User: req.UserId,
}
// 应该在消息中具体指明一个订单的具体的商品的扣减情况
jsonString, _ := json.Marshal(order)
// 1. 先发送库存归还半消息,再调用 ExecuteLocalTransaction
_, err = p.SendMessageInTransaction(context.Background(),primitive.NewMessage("order_reback", jsonString))
if err != nil {fmt.Printf("发送失败: %s\n", err)return nil, status.Error(codes.Internal, "发送消息失败")
}
if orderListener.Code != codes.OK {return nil, status.Error(orderListener.Code, orderListener.Detail)
}return &proto.OrderInfoResponse{Id: orderListener.ID, OrderSn: order.OrderSn, Total: orderListener.OrderAmount}, nil
if err := c.Subscribe("order_reback", consumer.MessageSelector{}, handler.AutoReback); err != nil {fmt.Println("读取消息失败")
}
// 便于返回商品详情
type GoodsDetail struct {Goods int32Num int32
}
type GoodsDetailList []GoodsDetailtype StockSellDetail struct {OrderSn string `gorm:"type:varchar(200);index:idx_order_sn,unique;"` // 要指定索引名称Status int32 `gorm:"type:varchar(200)"` //1 表示已扣减 2. 表示已归还Detail GoodsDetailList `gorm:"type:varchar(200)"`
}func (StockSellDetail) TableName() string {return "stockselldetail"
}
package mainimport ("crypto/md5""encoding/hex""fmt""gorm.io/driver/mysql""gorm.io/gorm""gorm.io/gorm/logger""gorm.io/gorm/schema""io""log""shop_srvs/inventory_srv/model""os""time"
)func genMd5(code string) string{Md5 := md5.New()_, _ = io.WriteString(Md5, code)return hex.EncodeToString(Md5.Sum(nil))
}func main() {dsn := "root:root@tcp(192.168.109.128:3306)/shop_inventory_srv?charset=utf8mb4&parseTime=True&loc=Local"newLogger := logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), // io writerlogger.Config{SlowThreshold: time.Second, // 慢 SQL 阈值LogLevel: logger.Info, // Log levelColorful: true, // 禁用彩色打印},)// 全局模式db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{NamingStrategy: schema.NamingStrategy{SingularTable: true,},Logger: newLogger,})if err != nil {panic(err)}//_ = db.AutoMigrate(&model.Inventory{}, &model.StockSellDetail{})//插入一条数据//orderDetail := model.StockSellDetail{// OrderSn: "imooc-bobby",// Status: 1,// Detail: []model.GoodsDetail{{1,2},{2,3}},//}//db.Create(&orderDetail)var sellDetail model.StockSellDetaildb.Where(model.StockSellDetail{OrderSn:"vshop-roy"}).First(&sellDetail)fmt.Println(sellDetail.Detail)
}
p.Shutdown
关闭 Producer,会报错 GetOrNewRocketMQClient
clientMap
是一个协程同步的 map,传入的 ClientID 可以看到是进程号func (c *rmqClient) ClientID() string {id := c.option.ClientIP + "@"if c.option.InstanceName == "DEFAULT" {id += strconv.Itoa(os.Getpid())} else {id += c.option.InstanceName}if c.option.UnitName != "" {id += "@" + c.option.UnitName}return id
}