简介
RocketMQ是阿里基于开源思想做的一款产品,它作为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。
概述
RocketMQ由四个部分组成:NameServer Cluster,Broker Cluster,Producer Cluster,Consumer Cluster,他们都可以水平扩展避免单点故障。
阅读说明
在本书中用类比的方式描述了消息队列中间价RocketMQ抽象的工作原理和性质,便于读者理解。
工作流程
NameServer Cluster:名称服务集群。
它就是一个服务注册中心。NameServer就类似于商场中的商品查询机器,它里面存储着所有商品的分类,每个分类所在的货架号等信息。
- NameServer稳定性很高。假设一个商场中有多台商品查询机器,它们之间相互独立。即使一台机器挂了,顾客依然可以从其它机器上查询商品;但若所有的查询机器都挂了,那么已经查询到信息的顾客仍然可以记住之前查询到的商品所在货架,但若想添加商品或新来的顾客就无法获得服务了。
- NameServer所对应的压力不会太大。假设每个货架都有感知自己商品变化的功能,每隔30秒该货架就要向商品查询机器发送自己的商品信息(种类,数量等等,也就是RocketMQ中的Topic),我们美名其曰把这个消息叫做“心跳”。每个货架每隔30秒就要向查询机器发送自己的“心跳”,一方面为了更新机器中自己的信息,另一方面告诉机器自己还“活着”。
Broker Cluster:代理服务集群。
它可能是整个消息中间件中最重要的模块了,Broker的作用类似于“货架”。试想一个商场要是没有货架和商品的话其它的东西还有什么意义呢?
- 商场的工作人员将货物放置在某个货架上,当顾客满怀期待的来到该货架时,货架就将顾客需要的商品交付给他,这就是传说中的“PULL”模式;
- 而当工作人员把商品摆好之后,看了下记录的手册,发现A顾客之前“订阅”了该商品,于是主动通知A来取(消费)该商品,这就是“PUSH”模式。
如上图所示,每个Broker可以有一台备份货架,上面存储的内容与主货架一致,当主货架发生意外时,可以切换到备用货架。
那么问题又来了,店员发现某个货架“挂了”,于是赶紧去恢复它,那如何知道它之前的商品有哪些呢?
幸运的是,RocketMQ早已替我们想到了这一点,它将所有商品的信息记录在一张表上,我们只需要翻阅这张“表”(硬盘),就可以知道它之前的所有商品信息。
另外,我们可能会注意到,在商场购买商品时,货架上的商品往往是“有序摆放”的。原因有很多,比如店员可能向先卖掉排在前面的商品(比如它快过期了...)RocketMQ也可以保证商品被购买的有序性,不过这个有序性是局部的,也就是对于单个货架的,因为我们不可能对不同货架上的商品进行排序,这也不符合常理。
Producer Cluster:生产者集群。
当店员要进行补货时,先去商品查询机器(NameServer)查询要补货的商品(Topic)在哪些broker中(这里要建立一个长连接,店员为了防止每次都去查询机器查询(这样可能会妨碍其他顾客使用),就每30秒查询一次,并将商品和货架的关系更新到本地内存中)。当店员知道了要补货的商品所在货架后,就可以去对应货架补货了(这里也建立一个长连接,店员每30秒发送一个“心跳”包,确保让货架知道自己没有下班,货架也会每10秒扫描一次在它这里注册的店员,若2分钟内该店员都没有“心跳”,则他可能是”下班“了)。
Consumer Cluster:消费者集群。
顾客的行为与店员是类似的。当顾客要买某个商品时先去查询机器查询,同时要跟该机器建立长连接(心跳机制与生产者的相同)。顾客知道了对应的货架,可以去主动取得商品(对应于“PULL”模式),当然,它也可以订阅该商品,当商品到货时等着货架通知他。
可靠性保证
- 保证消息有序
默认的消息发送时,一个Topic可以对应于多个队列,也就是说店员可以申请多个货架来放同一个类型的商品,如果我们想让这些商品全部顺序摆放的话,这显然无法达到。
- 全局顺序消息:
要保证全局顺序消息,店员首先需要保证申请的货架只有一个,然后一件一件地将商品摆放上去,这样就可保证快过期的商品放在前面了。
2. 部分顺序消息:
如果我们要保证部分消息有序,实际上有两种做法。
第一种是将原本属于一个Topic的商品分为多个Topic,然后每个Topic只申请一个队列。例如店员现在想要添加饮料,但饮料有很多,比如咖啡,可乐等等。现在店员想要保证咖啡和咖啡之间有顺序,可乐和可乐之间有顺序,咖啡喝可乐之间无所谓,那我们可以分别为咖啡和可乐定义两个Topic。
但这样有一个缺点,就是我们有很多很多种饮料,比如可乐,果汁,咖啡,牛奶等等,这会让Topic异常的多,维护起来并不容易,于是引出了第二种解决方案:
现在咖啡和可乐依然属于一个Topic,但店员提前跟顾客商量好了,我们的咖啡只放在一号货架上,可乐只放在二号货架,你可不要拿错了!于是同一业务ID的消息被发送到了同一个Message,从而保证了它们的有序性。
- 消息重复问题
现在我们假设店员脑子不太好使,老是健忘。她刚刚往货架上放了一件商品,但她却忘记了?!?!于是她就想,我到底放没放商品呢?算了,管它放没放,我再去放一件吧!于是商品出现了重复。但这也有一个好处,就是商品不会丢失,因为店员会一直放同一件商品知道她确认自己放成功了。
但这对顾客来说可能不太友好。比如顾客跟店员商量好自己要1号,2号,3号商品,且要求它们的顺序为123,顾客高高兴兴地来到货架,发现第一件商品正好是他想要的1号,于是他取走了,等到他想取他的第二件商品时,发现居然还是1???顾客一脸懵逼,他仔细一想,觉得可能是店员一不小心多放了一件一号,于是他把重复的1号商品放到一边,果然,下一件是他想要的2号。
但是顾客要怎么知道这个1号商品是重复的呢?
第一种方法,我们假设顾客是条金鱼,只有七秒钟的记忆,他取走了1号商品,回过头来拿第二件商品时已经不记得自己拿到的商品有哪些了,于是商场就要付出代价:顾客拿到的重复商品都不要钱!也就是说顾客多次取到同样的商品和他只取到一件商品付出的代价对他来说是相同的,这样就避免了重复消费,也保证了消费逻辑的幂等性。
第二种方法,顾客这回是带脑子来消费的,他每取到一件商品就记录下来,这样如果他以后再取到重复的商品就可以不消费了。
- 动态扩展
我们要建设的是一个服务稳定,并且可扩展的商场,于是我们要考虑到各种扩容情况:
1.NameServer横向扩展(查询机器)
在一个商场中,查询机器的可靠性是最重要的,假设一个商场只有一个查询机器,不仅会造成所有顾客都去该机器查询商品,万一它不小心坏了,所有顾客都无法完成购物消费,所以我们要对NameServer进行横向扩展,建立多个对等的实体,顾客可以根据自己的选择到某个NameServer上查询,多个NameServer上的信息应该是一致的。
2.Broker扩展(货架)
当我们的商场规模不断扩大,商品太多时,导致一个货架上的商品太多,但是我们有钱啊!我们可以购买更多的货架(机器),让新生产出的商品放到新的货架上,从而达到负载均衡的效果。但是新增加一个货架对顾客而言是不可见的,也就是说顾客不知道这个货架是否存在,顾客能做的只是去查询机器查询商品,因为每个新的货架都要去该机器上注册,这样它才能被顾客所找到。
- 消息发送
RocketMQ可以保证生产者的消息一定被broker所接收到,那么它是如何保证的呢? 在TCP/IP协议中,发送方每发送一条消息,接收方收到后必须返回一个ACK,只有发送方收到ACK了才确定该消息成功被接受者接收。RocketMQ的做法与它类似,当Broker收到消息后会返回给生产者一个ACK,生产者若一定时间内没收到ACK,则判定消息丢失,重发该消息。
- 顺序消费
消息队列的一大特性就是可以保证消息被消费者有序的消费,RocketMQ是这样做的:顾客某天做饭的时候没有酱油了,于是他来到我们的商场想买一瓶酱油,那么我们就给他一瓶酱油;过两天顾客觉得这个酱油很不错,想多买点存着,于是他来买了五瓶酱油。这五瓶酱油卖出去之后就与商场无关了,顾客把它们存起来,一瓶一瓶地使用(这代表消息消费的有序性,且这个顺序与他购买酱油时取的顺序一致,因为放在货架前面的保质期短)。我们规定“一瓶酱油使用完”这个动作叫做“消费成功”,假设顾客没有消费成功第一瓶酱油,那么他就要不断地重试,直到用完了以后才能用下一瓶酱油,这样就保证了消息被顺序消费。
高可用性
- 同步刷盘与异步刷盘
为了保证货架的高可用性,RocketMQ会将商品记录下来,若货架被偷,可以根据商品记录恢复该货架。实际上,这里有两种记录方案,对应的也就是同步刷盘和异步刷盘。
1.货架每添加一件商品就添加一条记录。这样的好处就是数据的一致性高,恢复起来数据丢失地最少,但缺点就是太浪费资源,因为需要一个人每时每刻地守在货架旁边等着记录新的商品数据。 2.货架上每放置n个商品或每隔一段时间就派人去记录下来当前的商品信息。这样带来的好处就是店员明显没那么忙了,但带来的缺点就是一旦货架被偷,可能会造成较多商品信息的丢失。我们可以根据自己的需要(可以容忍的消息丢失数量)来设定n的值。 说到这里就不得不跳出商场这个“喻体”,来说说“本体”了。消息生产者发送的消息,broker收到后在做必要的校验和检查之后的第一件事就是写入磁盘,写入成功之后返回应答给生产者。因此,可以确认每条发送结果为成功的消息服务器都是写入磁盘的。 写入磁盘,不意味着数据落到磁盘设备上,毕竟我们还隔着一层os,os对写有缓冲。只有执行“刷盘”操作,将缓冲区刷入磁盘,才是真的实现“持久化”。
- 主从同步
Broker为了保证高可用性,提供了一系列机制。
现在我们来设想一个极端的情况:假设每个货架上的商品是“可被损坏的”,比如突然这个货架里的商品都被偷走了,或者货架上的商品被一个调皮的小孩弄坏了,此时顾客满怀欣喜的来到商场,却发现要买的商品被“损坏”了,他心里十分不爽,决定以后再也不来这家商场了,这就造成了很差的用户体验。于是RocketMQ提供了“主从同步机制”(HA),这里主又叫“Master”,从又叫“Slave”。我们可以用4种方式部署集群:
- .单Master模式
该模式就是单机模式,若该节点挂掉,整个“商场”的服务将不可用。
- 多Master模式
该模式有点像Java的J.U.C包中ConcurrentHashMap的思想,即把数据“分段”。单个节点挂掉后,受到影响的只有这台机器上的商品,这保证了大部分商品可继续售卖。
- 多Master多Slave(异步复制模式)
试想每个货架都有一个备份货架,平时顾客购买商品时都从Master货架上取,然后异步更新到Slave货架上,当突然有一天Master不可用了,顾客仍然可以在Slave货架上买到一样的商品。但由于是弱一致性,这会丢失少量的信息。
- 多Master多Slave(同步复制模式)
形式与3一样,只不过Master更新时同时更新Slave,保证了强一致性,这样若Master突然挂掉,其对应的Slave货架仍然可以完美的替代它,不会出现任何信息丢失,只不过性能会比异步复制低10%左右。