解耦,异步(加快速度), 削峰填谷.
优点就是解耦,异步(加快速度), 削峰.
缺点就是
系统可用性降低, 如果MQ 宕机,所有依赖MQ的业务都会停摆.
系统复杂的上升, 引入MQ必将带来一系列问题,比如重复消费, 消息丢失,消息顺序,消息堆积.
异步会带来一致性问题, 如果异步调用失败, 数据可能会出现不一致问题.
MQ消息堆积的原因很简单: 生产者速度>消费者, 这时候我们有三种方案:
加快消费者处理速度,
加硬件, 加软件,优化性能瓶颈
减慢生产者处理速度
熔断,降级,
加大队列存储量,避免死信
使用惰性队列, 就是将队列溢出的信息持久化, 当然会降低时效性.
性能:阿里支撑,经受住淘宝,天猫双 11 重重考验; 性能高; 可靠性好 ; 可用性高易扩展功能:功能完善,如: 事务消息,消息重试,私信队列,定时消息等;跨平台易用: 跨语言,多协议接入 (支持 HTTP,MQTT,TCP 协议,支持 Restful 风格HTTP 收发消息)引进的就是 RocketMQ 的阿里云和 VIP 服务;
生产者
消费者
Broker, 这个是处理事情的MQ节点角色
NameServer, 这个是服务注册中心. 也即是RocketMQ中的管理节点.nameserver只需要保持最终一致性.
Topic, 每一个topic代表一类消息,一个生产者可以发消息给一个或多个topic,一个消费者也可以订阅一个或多个topic.
MessageQueue, 一个topic下可以有多个queue, 一个queue只能对应一个Consumer .主要是为了一个topic多个consumer的问题.
Tag, Topic下可以有多种消息属性, 就可以使用tag概念, 主要是让消费者可以选择过滤.
Offset, 其实就是我们常说的循环队列中的双指针, 常说的是consumerOffset, 始终不大于BrokerOffset指针的值.
不会。每条消息都会持久化到commitLog中,消费者消费消息后只是更新消息的消费进度而已。 默认是72小时后删除文件,可以通过配置文件的filereservetime属性更改这个过期时间。
相当于循环队列中, 不删数据, 只改变双指针的值. 定时删除.
push和pull都有,push是通过DefaultMQPushConsumer实现的,pull是通过DefaultMQPullConsumer实现的,但是push的底层还是通过pull(长轮询机制)实现的。
也就是当 broker收到消费者的pull请求后,会先去消息队列中看一下是否有消息,如果有消息会直接返回给consumer,如果没有消息,broker不会断开连接,等待5秒后会再检查是否有消息,不管有没有消息都会返回给消费者。
主要是push的话有慢消费问题, 还有需要建立长连接, 消费者多的话, 会对broker节点资源会造成压力.
如果broker主动推送消息的话有可能push速度快,消费速度慢的情况,那么就会造成消息在consumer端堆积过多,同时又不能被其他consumer消费的情况。而pull的方式可以根据当前自身情况来pull,不会造成过多的压力而造成瓶颈。所以采取了pull的方式。
消费模型由Consumer决定,消费维度为Topic。
集群消费 :每条消息只会被 consumer 集群内的任意一个 consumer 实例消费一次. 一条消息只会被同Group中的一个Consumer消费, 多个Group同时消费一个Topic时,每个Group都会有一个Consumer消费到数据,
使用集群消费的时候,consumer 的消费进度是存储在 broker 上,consumer 自身是不存储消费进度的。好处在于,当 consumer 集群是扩大或缩小时,由于消费进度统一在broker上,消息重复的概率会被大大降低了。
广播消费消息将对一个Consumer Group下的各个Consumer实例都消费一遍。
consumer 的消费进度是存储在各个 consumer 实例上,这就容易造成消息重复。还有很重要的一点,对于广播消费来说,是不会进行消费失败重投的,所以在 consumer 端消费逻辑处理时,需要额外关注消费失败的情况。
虽然广播消费能保证集群内每个 consumer 实例都能消费消息,但是消费进度的维护、不具备消息重投的机制大大影响了实际的使用。
在实际使用中,更推荐使用集群消费,因为集群消费不仅拥有消费进度存储的可靠性,还具有消息重投的机制。
同步消息发送 : 发送方发送消息后,收到服务端响应后才发送下一条消息.
异步消息发送 : 发送一条消息后,不等服务端返回就可以继续发送消息或者后续任务处理。发送方通过回调接口接收服务端响应,并处理响应结果。
单向消息发送 : 发送方发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不需要应答。
producer的发送端 : 发送失败会重试(默认3次, 重试的时候会发送到其他Broker上), 重试失败会进行业务兜底(降级策略), 大于重试次数, 会扔到死信队列, 默认三天后删除.
broker的持久化机制 : broker收到后先保存到内存, 立刻响应成功给producer;可将异步刷盘改为同步刷盘, 再加上broker的主从同步保证高可用.
以及consumer的消费端 : 完成业务操作后才能响应成功给broker
总得来说,RocketMq具有以下几个优势:
吞吐量高:单机吞吐量可达十万级可用性高:分布式架构消息可靠性高:经过参数优化配置,消息可以做到0丢失(参考 其他面试题)功能支持完善:MQ功能较为完善,还是分布式的,扩展性好支持10亿级别的消息堆积:不会因为堆积导致性能下降源码是java:方便我们查看源码了解它的每个环节的实现逻辑,并针对不同的业务场景进行扩展可靠性高:天生为金融互联网领域而生,对于要求很高的场景,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况稳定性高:RoketMQ在上可能更值得信赖,这些业务场景在阿里双11已经经历了多次考验
1)发送时消息重复消费:当一条消息发送到broker并完成持久化,如果这时候出现网络闪断或者broker宕机,导致broker对生产者的应答失败,那么生产者会尝试重发消息,导致消费者收到重复的消息。
2)消费消息时重复:当消费者已经完成消费,但是响应给broker时出现网络闪断,为了保证消息至少被消费一次,broker会尝试再次投递消息。
3) 负载均衡时消息重复:当broker或客户端重启、扩容或缩容时,会触发Rebalance,此时消费者可能会消费到重复的消息。
为了避免重复消费, 需要做好幂等, 我们一般在业务中使用MVCC(就是version方案)或者唯一id方案去重(状态或者状态表, Redis也可以, 根据场景判断).
如果由MQ处理,会影响吞吐量和高可用,所以RocketMQ不解决消息重复的问题。
全局有序:整个 RocketMQ 系统的所有消息严格按照队列先入先出顺序进行消费。局部有序:只保证一部分关键消息的消费顺序。 一般保证局部有序就可以.
思路: RocketMQ 默认是轮询Queue, 如果要保证局部有序, 我们只需要指定这一组消息都指定到某个队列上即可.
落地: RocketMQ 中可以在发送者发送消息时指定一个 MessageSelector 对象,让这个对象来决定消息发入哪一个 MessageQueue。这样就可以保证一组有序的消息能够发到同一个 MessageQueue 里.
此外: 全局消息有序的方式对整个Topic 的消息吞吐影响是非常大的,基本上就没有用MQ的必要了。
追问:怎么保证消息发到同一个queue?Rocket MQ给我们提供了MessageQueueSelector接口,可以自己重写里面的接口,实现自己的算法,举个最简单的例子:判断i % 2 == 0,那就都放到queue1里,否则放到queue2里。
首先要分析一下消息堆积可能造成的原因
1、如果是机器本身的原因,比如消费者组有几个消费者服务挂掉了,剩下少量消费者消费能力不足导致的消费积压,那就正常重新启动,然后慢慢再去消费积压的消息, 如果对时效要求较高, 这种情况一般是有临时扩容队列和消费者的预案.
2、如果是生产者端由业务暴增引起的生产过快,而消费者端消费能力不足,这个时候就可以采取生产者端限流或者进行消费者扩容;这个时候要注意,如果生产者只是短期暴增或者消息的业务不是很重要可以采用限流,如果是长期暴增真正的业务量上涨就必须要进行消费者扩容。
3、比如说消费者挂了,然后broker堆积了很多消息,然后可以先把堆积的消息读到别的地方比如mysql或者es然后去后续进行处理,然后把RocketMQ堆积的消息删掉,启动消费者保障消费者正常消费,这里要注意的是删除堆积消息之前,需要停止mq。
几种常见的思路:
如果是秒杀场景的消息激增, 那么可以通过增长消息队列的容量(或者新增队列)来临时处理, 等峰值过去, 就会自然过渡ok.
如果消息已经堆积较多, 而生产者那边无法限流, 消费者这边又暂时无法添加硬件, 也可以使用惰性队列进行落盘, 等到了夜晚后拿出重跑.
如果消息不重要, 可以熔断(快速失败).
追问:堆积时间过长, 消息会超时吗?RocketMQ中的消息只会在commitLog被删除的时候才会消失,不会超时。也就是说未被消费的消息不会存在超时删除这情况。
如图所示,消息生产者发送消息到broker,都是会按照顺序存储在CommitLog文件中,每个commitLog文件的大小为1G
CommitLog-存储所有的消息元数据,包括Topic、QueueId以及message
CosumerQueue-消费逻辑队列:存储消息在CommitLog的offset
IndexFile-索引文件:存储消息的key和时间戳等信息,使得RocketMq可以采用key和时间区间来查询消息
也就是说,rocketMq将消息均存储在CommitLog中,并分别提供了CosumerQueue和IndexFile两个索引,来快速检索消息
RocketMq采用文件系统进行消息的存储,相对于关系型数据库的方式性能更高.
RocketMq与Kafka在写消息与发送消息上,继续沿用了Kafka的这两个方面:顺序写和零拷贝
1)顺序写顺序写比随机性效率高几千倍. 所以对于收到的消息,采用顺序写写入commit Log文件.2)零拷贝比如:读取文件,再用socket发送出去这一过程, 用传统方式实现:先读取、再发送,实际会经过以下四次复制1、将磁盘文件,读取到操作系统内核缓冲区Read Buffer2、将内核缓冲区的数据,复制到应用程序缓冲区Application Buffer3、将应用程序缓冲区Application Buffer中的数据,复制到socket网络发送缓冲区4、将Socket buffer的数据,复制到网卡,由网卡进行网络传输传统方式,读取磁盘文件并进行网络发送,经过的四次数据copy是非常繁琐的
这里使用了零拷贝,也就是说,直接由内核缓冲区Read Buffer将数据复制到网卡,省去第二步和第三步的复制, 使得RocketMq读消息的性能有一个质的提升..
此外,还需要再提一点,零拷贝技术采用了MappedByteBuffer内存映射技术,采用这种技术有一些限制,其中有一条就是传输的文件不能超过2G,这也就是为什么RocketMq的存储消息的文件CommitLog的大小规定为1G的原因
小结:RocketMq采用文件系统存储消息,并采用顺序写写入消息,使用零拷贝发送消息,极大得保证了RocketMq的高性能.
在不开启容错的情况下,轮询队列进行发送,如果失败了,重试的时候过滤失败的Broker如果开启了容错策略,会通过RocketMQ的预测机制来预测一个Broker是否可用。如果上次失败的Broker可用那么还是会选择该Broker的队列如果上述情况失败,则随机选择一个进行发送在发送消息的时候会记录一下调用的时间与是否报错,根据该时间去预测broker的可用时间。其实就是send消息的时候queue的选择。
RocketMQ有4种部署类型
1)单机, 不推荐. 可用差.
2)多Master模式集群, 配置简单, 性能最高, 某节点宕机重启不会影响RocketMQ服务缺点:如果某节点宕机, 会导致该节点存在未被消费的消息在节点恢复之前不能被消费.
3)多Master多Slave模式
给每个master配置一个slave结点,是线上方案. 实际上往往又根据主从同步方案再分为两种.
异步复制, 主从消息一致只会有毫秒级的延迟, 性能较好,但可能会有少量消息丢失
同步双写, 只有master和slave都写成功以后,才会向客户端返回成功消息无延迟,服务可用性与数据可用性都非常高, 缺点就是会降低消息写入的效率,并影响系统的吞吐量
Consumer和queue会优先平均分配,如果Consumer少于queue的个数,则会存在部分Consumer消费多个queue的情况;
如果Consumer等于queue的个数,那就是一个Consumer消费一个queue;
如果Consumer个数大于queue的个数,那么会有部分Consumer空余出来,白白的浪费了。
追问:如果Consumer和Queue不对等,上线了多台也在短时间内无法消费完堆积的消息怎么办?准备一个临时的topic, queue的数量是堆积的几倍, queue分布到多Broker中; 上线一台Consumer做消息的搬运工,把原来Topic中的消息挪到新的Topic里,不做业务逻辑处理,只是挪过去上线N台Consumer同时消费临时Topic中的数据
对于生产者来说,生产者发送消息时,会轮询目标topic下所有的消息队列,轮询发送即可.
对于消费者来说,不同的consumer会通过订阅topic会绑定到对应的MessageQueue中,因为消息是均匀地落在消息队列上的,消费者也会均匀地消费到消息。
consumer绑定topic的策略有平均分配策略、轮询分配、一致性hash等等。
而需要注意的是,当consumer的数量大于队列的数量的话,根据Rocket MQ的机制,多出来的Consumer 不会去消费数据,因此建议consumer的数量小于或者等于queue的数量,避免不必要的浪费
Broker会向所有的NameServer上注册自己的信息,而不是某一个,是全部。多线程的方式向所有NameServer上注册。
如图所示, 一句话总结: 一个topic, 多个broker, 主从同步, 异步落盘.
此外, 对NameServer 来说, 我们采用集群化部署。
Broker集群会将所有的broker基本信息、topic信息以及两者之间的映射关系,轮询存储在每个NameServer中。 集群化保证NameServer的高可用.
Broker 定期(30s) 发送一次心跳给NameServer , 发送的信息种包含
1)broker的基本信息(ip port等)2)主题topic的地址信息3)broker集群信息4)存活的broker信息5)filter 过滤器
生产者的请求会根据topic询问NameServer找到对应的broker.生产者会做一个本地缓存,定期(每隔30s)拉取一次最新路由信息. 因为不是实时的, 所以生产上不建议手动变更broker列表.
里面比较典型的设计模式有单例、工厂、策略、门面模式。单例工厂无处不在,策略印象深刻比如发消息和消费消息的时候queue的负载均衡就是N个策略算法类,有随机、hash等,这也是能够快速扩容天然支持集群的必要原因之一。持久化做的也比较完善,采取的CommitLog来落盘,同步异步两种方式。
这个是rocketMq的集群架构图,里面包含了四个主要部分:NameServer集群,Producer集群,Cosumer集群以及Broker集群
NameServer 担任路由消息的提供者。生产者或消费者能够通过NameServer查找各Topic相应的Broker IP列表分别进行发送消息和消费消息。nameServer由多个无状态的节点构成,节点之间无任何信息同步
broker会定期向NameServer以发送心跳包的方式,轮询向所有NameServer注册以下元数据信息:
1)broker的基本信息(ip port等)
2)主题topic的地址信息
3)broker集群信息
4)存活的broker信息
5)filter 过滤器
也就是说,每个NameServer注册的信息都是一样的,而且是当前系统中的所有broker的元数据信息
Producer负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要
Broker,消息中转角色,负责存储消息、转发消息。在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备
Consumer负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费
RocketMq的工作流程如下:1)首先启动NameServer。NameServer启动后监听端口,等待Broker、Producer以及Consumer连上来2)启动Broker。启动之后,会跟所有的NameServer建立并保持一个长连接,定时发送心跳包。心跳包中包含当前Broker信息(ip、port等)、Topic信息以及Borker与Topic的映射关系3)创建Topic。创建时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic4)Producer发送消息。启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic所在的Broker;然后从队列列表中轮询选择一个队列,与队列所在的Broker建立长连接,进行消息的发送5)Consumer消费消息。跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,进行消息的消费
分布式系统中的事务可以使用TCC(Try、Confirm、Cancel)、2pc来解决分布式系统中的消息原子性
RocketMQ 4.3+提供分布事务功能,通过 RocketMQ 事务消息能达到分布式事务的最终一致
RocketMQ实现方式:
Half Message:预处理消息,当broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中
检查事务状态:Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向消息发送者确认事务执行状态(提交、回滚、未知),如果是未知,Broker会定时去回调在重新检查。
超时:如果超过回查次数,默认回滚消息。
也就是他并未真正进入Topic的queue,而是用了临时queue来放所谓的half message,等提交事务后才会真正的将half message转移到topic下的queue。