博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
消息队列
阅读量:6072 次
发布时间:2019-06-20

本文共 8669 字,大约阅读时间需要 28 分钟。

消息队列

ActiveMQ

RabbitMQ

Kafka

RocketMQ

1.什么是消息队列

可以把消息队列比作一个存放消息的容器,当我们需要使用消息的时候可以取出消息供自己使用。消息队列是分布式系统中重要的组件,使用消息队列主要是为了通过异步处理提高系统性能和削峰、降低系统耦合性。

队列Queue是一种先进先出的数据结构,所以消费消息时也是按照先进先出的顺序来消费的。比如生产者发送消息1、2、3…,消费者就会按照1、2、3...的顺序来消费。但是偶尔也会出现消息被消费顺序不对的情况,比如某个消息消费失败,一个queue多个consumer,我们一定要保证消息被消费的顺序正确。

除此之外,考虑如何保证消息不被重复消费?如何保证消息的可靠性传输(如何处理消息丢失的问题)?等。使用消息队列不是十全十美,它会让系统可用性降低、复杂度提高,需要保障一致性等问题。

2.为什么要用消息队列

两点好处:

  1. 通过异步处理提高系统性能(削峰、减少响应所需时间)
  2. 降低系统合性

(结合项目来回答)

参考:《大型网站技术架构》第四章和第七章均有提到消息队列对应用性能及扩展性的提升

具体:

(1)通过异步处理提高系统性能(削峰、减少响应所需时间)

在不使用消息队列服务器的时候,用户的请求数据直接写入数据库,在高并发的情况下数据库压力剧增,使得响应速度变慢。

006tNc79ly1g1v00q9ma3j311q0fsdnh.jpg

但是在使用消息队列之后,用户的请求数据发送给消息队列之后立即返回,再由消息队列的消费者进程从消息队列中获取数据,异步写入数据库。由于消息队列服务器处理速度快于数据库(消息队列也比数据库有更好的伸缩性),因此响应速度得到大幅改善。

以上分析我们可以得出消息队列具有很好的削峰作用的功能——即通过异步处理,将短时间高并发产生的事

务消息存储在消息队列中,从而削平高峰期的并发事务。 举例:在电子商务一些秒杀、促销活动中,合理使用消息队列可以有效抵御促销活动刚开始大量订单涌入对系统的冲击。

因为用户请求数据写入消息队列之后就立即返回给用户了,但是请求数据在后续的业务校验、写数据库等操作中可能失败。因此使用消息队列进行异步处理之后,需要适当修改业务流程进行配合,比如用户在提交订单之后订单数据写入消息队列,不能立即返回用户订单提交成功,需要在消息队列的订单消费者进程真正处理完该订单之后,甚至出库后,再通过电子邮件或短信通知用户订单成功,以免交易纠纷。这就类似我们平时手机订火车票和电影票。

(2)降低系统耦合性

如果模块之间不存在直接调用,那么新增模块或者修改模块就对其他模块影响较小,这样系统的可扩展

性无疑更好一些。

我们最常见的事件驱动架构类似生产者消费者模式,在大型网站中通常用利用消息队列实现事件驱动结构。

006tNc79ly1g1uzxztaxjj31120eq42y.jpg

消息队列是利用发布-订阅模式工作,消息发送者(生产者)发布消息,一个或多个消息接受者(消费者)订阅

消息。 从图可以看到消息发送者(生产者)和消息接受者(消费者)之间没有直接耦合,消息发送者将消息发送
至分布式消息队列,即结束对消息的处理。消息接受者从分布式消息队列获取该消息后进行后续处理,并不需要知道该消息从何而来。对新增业务,只要对该类消息感兴趣,即可订阅该消息,对原有系统和业务没有任何影响,从而实现网站业务的可扩展性设计。

消息接受者对消息进行过滤、处理、包装后,构造成一个新的消息类型,将消息继续发送出去,等待其他消息接

受者订阅该消息。因此基于事件(消息对象)驱动的业务架构可以是一系列流程。

另外为了避免消息队列服务器宕机造成消息丢失,会将成功发送到消息队列的消息存储在消息生产者服务器上,等消息真正被消费者服务器处理后才删除消息。在消息队列服务器宕机后,生产者服务器会选择分布式消息队列服务器集群中的其他服务器发布消息。

JMS提供2种消息模式:(1)发布/订阅:Topic,可以重复消费(常用)(2)点对点:Queue,不可重复消费。

AMQP提供5种消息模式:(1) direct exchange (2) fanout exchange (3) topic change (4) headers exchange (5) system exchange。本质来讲,后四种和 JMS的pub/sub模型没有太大差别,仅是在路由机制上做了更详细的划分。

3.使用消息队列会带来的问题

  1. 系统可用性降低:在加入MQ之前,不用考虑消息丢失或者MQ挂掉等情况,但加入MQ后就需要考虑了。
  2. 系统复杂性提高:加入MQ后,需要保证消息没有被重复消费、处理消息丢失的情况、保证消息传递的顺序性等。
  3. 一致性问题:虽然消息队列可以实现异步,提高系统的响应速度,但是,万一消息的真正消费者没有正确消费消息怎么办?这就会导致数据不一致的情况。

4.JMS VS AMQP

4.1 JMS

JMS简介

JMS(JAVA Message Service,java消息服务)是java的消息服务,JMS的客户端之间可以通过JMS服务进行异步的消息传输。JMS API是一个消息服务的标准或者说是规范,允许应用程序组件基于JavaEE平台创建、发送、接收和读取消息。它使分布式通信耦合度更低,消息服务更加可靠以及异步性。

ActiveMQ 就是基于 JMS 规范实现的。

JMS两种消息模型

1.点到点模型(P2P)

006tNc79ly1g1v0qe5n9yj317808yacm.jpg

(Queue)作为消息通信载体;满足生产者与消费者模式,一条消息只能被一个消费者使用,未被消费的消息在队列中保留直到被消费或超时。比如:我们生产者发送100条消息的话,两个消费者来消费一般情况下两个消费者会按照消息发送的顺序各自消费一半(也就是你一个我一个的消费。)

2.发布/订阅模型(Pub/Sub)

006tNc79ly1g1v0r2zwmmj30qs0auaev.jpg

发布订阅模型(Pub/Sub) 使用主题(Topic)作为消息通信载体,类似于广播模式;发布者发布一条消息,该消息通过主题传递给所有的订阅者,在一条消息广播之后才订阅的用户则是收不到该条消息的

JMS 五种不同的消息正文格式

JMS定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  • StreamMessage Java原始值的数据流
  • MapMessage 一套名称-值对
  • TextMessage 一个字符串对象
  • ObjectMessage 一个序列化的 Java对象
  • BytesMessage 一个字节的数据流
  • Message (只有消息头和属性)

4.2 AMQP

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议(二进制应用层协议),是应用层协议的一个开放标准,为面向消息的中间件设计,兼容 JMS。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品,不同的开发语言等条件的限制。

RabbitMQ 就是基于 AMQP 协议实现的。

4.3 JMS VS AMQP

对比项 JMS AMQP
定义 Java API 协议
跨语言
跨平台
支持消息模型 2种:(1)Peer-2-Peer (2)Pub/sub 5种:(1)direct exchange (2)fanout exchange (3)topic change (4)headers exchange (5)system exchange
支持消息类型 支持多种消息类型 byte[] (二进制)

总结:

  1. AMQP 为消息定义了线路层(wire-level protocol)的协议,而JMS所定义的是API规范。在 Java 体系中,多个client均可以通过JMS进行交互,不需要应用修改代码,但是其对跨平台的支持较差。而AMQP天然具有跨平台、跨语言特性。
  2. JMS 支持TextMessage、MapMessage 等复杂的消息类型;而 AMQP 仅支持 byte[] 消息类型(复杂的类型可
    序列化后发送)。
  3. 由于Exchange 提供的路由算法,AMQP可以提供多样化的路由方式来传递消息到消息队列,而 JMS 仅支持 队
    列 和 主题/订阅 方式两种。

5.常见的消息队列对比

对比项 ActiveMQ RabbitMQ RocketMQ Kafka
吞吐量 万级(性能最差) 万级 十万级甚至百万级 十万级甚至百万级
可用性 高可用。基于主从架构 高可用。基于主从架构 高可用。基于分布式架构 高可用。基于分布式架构,一个数据多个副本,少数及其宕机,不会丢失数据,不会导致不可用
时效性 ms级 基于erlang开发,所以并发能力很强,性能极好,延时很低,达到微妙级 ms级 ms级
功能支持 较完备 较完备 较完备 较简单,主要支持简单的MQ功能,在大数据领域的实时计算以及日志采集被大规模使用,是事实上的标准
消息丢失 丢失可能性非常低 丢失可能性非常低 理论上不会丢失 理论上不会丢失

总结:

  1. ActiveMQ 的社区算是比较成熟,但是较目前来说,ActiveMQ 的性能比较差,而且版本迭代很慢,不推荐使
    用。
  2. RabbitMQ 在吞吐量方面虽然稍逊于 Kafka 和 RocketMQ ,但是由于它基于 erlang 开发,所以并发能力很强,性能极其好,延时很低,达到微秒级。但是也因为 RabbitMQ 基于 erlang 开发,所以国内很少有公司有实力做erlang源码级别的研究和定制。如果业务场景对并发量要求不是太高(十万级、百万级),那这四种消息队列中,RabbitMQ 一定是你的首选。如果是大数据领域的实时计算、日志采集等场景,用 Kafka 是业内标准的,绝对没问题,社区活跃度很高,绝对不会黄,何况几乎是全世界这个领域的事实性规范。
  3. RocketMQ 阿里出品,Java 系开源项目,源代码我们可以直接阅读,然后可以定制自己公司的MQ,并且RocketMQ 有阿里巴巴的实际业务场景的实战考验。RocketMQ 社区活跃度相对较为一般,不过也还可以,文档相对来说简单一些,然后接口这块不是按照标准 JMS 规范走的有些系统要迁移需要修改大量代码。还有就是阿里出台的技术,你得做好这个技术万一被抛弃,社区黄掉的风险,那如果你们公司有技术实力我觉得用RocketMQ 挺好的
  4. kafka 的特点其实很明显,就是仅仅提供较少的核心功能,但是提供超高的吞吐量,ms 级的延迟,极高的可用性以及可靠性,而且分布式可以任意扩展。同时 kafka 最好是支撑较少的 topic 数量即可,保证其超高吞吐量。kafka 唯一的一点劣势是有可能消息重复消费,那么对数据准确性会造成极其轻微的影响,在大数据领域中以及日志采集中,这点轻微影响可以忽略这个特性天然适合大数据实时计算以及日志收集。

参考:《Java工程师面试突击第1季-中华石杉老师》

6.如何保证消息队列高可用?

引入消息队列后,系统的可用性下降。

在生产中,没有人使用单机模式的消息队列。因此,作为一个合格的程序员,应该对消息队列的高可用有很深刻的了解。

面试官会问:你们的消息中间件是如何保证高可用的?

回答:对消息队列的集群模式的深刻了解

RocketMQ为例:它的集群就有:多master模式、多master多salve异步复制模式、多master多salve同步双写模式。多master多salve模式部署架构图:

006tNc79ly1g1vg6go88aj314e0n4doi.jpg

这图乍一看和kafka很像,只是nameserver集群,在kafka中是用zookeeper代替,都是用来保存和发现master和salver的。通信过程如下:producer与nameserver集群中的其中一个节点(随机选择)建立长连接,定期从nameserver获取topic路由信息,并向提供topic服务的broker master建立长连接,且定时向broker发送心跳。producer只能将消息发送到broker master,但是consumer则不一样,它同时和提供topic服务的master和slave建立长连接,既可以从broker master订阅消息,也可以从broker slave订阅消息。

kafka:为了对比说明直接上kafka的拓扑架构图:

006tNc79ly1g1vgjwmc8cj315q0ms7es.jpg

一个典型的kafka集群中包含若干producer(可以是web前端产生的page view,或者是服务器日志,系统cpu,memory等),若干broker(kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干consumer group,以及一个zookeeper集群。kafka通过zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。

RabbitMQ 也有普通集群和镜像集群模式。

要求:在回答高可用的问题时,应该能逻辑清晰的画出自己的mq集群架构或清晰的叙述出来。

7.如何保证消息不被重复消费?

分析:消息队列领域的基本问题。

另一种问法:如何保证消息队列的幂等性?

考察设计能力。可以根据具体的业务场景来回答,没有固定的答案。

为什么会造成重复消费

无论哪种消息队列,造成重复消费原因都是类似的。正常情况下,消费者在消费消息的时候,消费完毕后,会发送一个确认消息给消息队列,消息队列就知道该消息被消费了,就会将该消息从消息队列中删除。

只是不同的消息队列发送的确认信息形式不同,例如:rabbitmq是发送一个ack确认消息,rocketmq是返回一个consume_success成功标志,kafka实际上有个offset的概念,简单说,就是每个消息都有一个offset,kafka消费过消息后,需要提交offset,让消息队列知道自己已经消费过了。

重复消费的原因:因为网络传输等故障,确认消息没有传送到消息队列,导致消息队列不知道自己已经消费过该消息了,再次将该消息分发给其他消费者。

如何解决

针对业务场景来答,分以下几点:

  1. 比如:拿到这个消息做数据库的insert操作。容易。给这个消息做一个唯一主键,若出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
  2. 比如:拿到这个消息做redis的set操作。容易,不用解决。因为无论set几次结果都是一样的,set操作本来就算幂等操作。
  3. 如果上面两种情况不行,上大招。准备一个第三方介质,来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以k-v形式写入redis。那么消费者开始消费前,先去redis中查询有没有消费记录即可。

8.如何保证消息的可靠性传输?(如何处理消息丢失的问题)

每种MQ都要从三个角度来分析:生产者弄丢数据、消息队列弄丢数据、消费者弄丢数据

RabbitMQ

(1)生产者弄丢数据

rabbitmq提供transaction和confirm模式来确保生产者不丢消息。

transaction机制:发送消息前,开启事务(channel.txSelect() ),然后发送消息,如果发送过程中出现什么异常,事务就会回滚(channel.txRollback() ),如果发送成功则提交事务(channel.txCommit() )。缺点:吞吐量下降。

生产上使用confirm模式居多:一旦channel进入confirm模式,所有在该信道上面发布的消息都将会被指派一个唯一的id(从1开始),一旦消息被投递到所有匹配的队列之后,rabbitmq就会发送一个Ack给生产者(包含消息的唯一id),这就使得生产者知道消息已经正确到达目的队列了。如果rabbitmq没能处理该消息,则会发送一个Nack消息给你,你可以进行重试操作。

处理Ack和Nack的代码如下所示:

channel.addConfirmListener(new ConfirmListener(){  @override  public void handleNack(long deliveryTag,boolean multiple) throws IOException{    System.out.println("nack:deliveryTag="+deliveryTag+" multiple:"+multiple);  }  @override  public void handleAck(long deliveryTag, boolean multiple) throws IOException{    System.out.println("ack:deliveryTag="+deliveryTag+" multiple:"+multiple");  }});

(2)消息队列弄丢数据

处理消息队列丢数据的情况,一般是开启持久化磁盘的配置。这个持久化配置可以和confirm机制配合使用,你可以在消息持久化磁盘后,再给生产者发送一个Ack信号。这样,如果消息持久化磁盘之前rabbitmq阵亡了,那么生产者收不到Ack信号,生产者会自动重发。

如何持久化?两步:

  1. 将queue的持久化标志durable设置为true,则代表是一个持久的队列
  2. 发送消息的时候将deliveryMode=2这样设置一行,rabbitmq就算挂了,重启后也能恢复数据

(3)消费者弄丢数据

消费者弄丢数据一般是因为采用了自动确认消息模式。这种模式下,消费者会自动确认收到信息。这时rabbitmq会立即将消息删除,这种情况下如果消费者出现异常而没能处理该消息,就会丢失该消息。

解决方案:手动确认消息即可。

Kafka

先引入一张kafka replication的数据流向图

006tNc79ly1g1vis3br7kj31580kaq93.jpg

producer在发布消息到某个partition时,先通过zookeeper找到该partition的leader,然后无论该topic的replication factor为多少(即该partition有多少个replica),producer只将该消息发送到该partition的leader。leader会将该消息写入其本地的log。每个follower都从leader中pull数据。

针对上述情况,得出如下分析:

(1)生产者弄丢数据

在kafka生产中,基本都有一个leader和多个follwer。follwer会去同步leader的信息。因此,为了避免生产者丢失数据,做如下两点配置:

  1. 第一个配置要在producer端设置acks=all。这个配置保证了:follwer同步完成后,才认为消息发送成功。
  2. 在producer端设置retries=max,一旦写入失败,则无限重试。

(2)消息队列弄丢数据

针对消息队列丢数据的情况,无外乎救赎,数据还没同步,leader就挂了,这时zookeeper会将其他follwer切换为leader,那数据就丢失了。针对这种情况,应该做两个配置:

  1. replication.factor 参数,这个值必须大于1,即要求每个partition必须有至少2个副本。
  2. min.insync.replicas参数,这个值必须大于1,这个是要求一个leader至少感知到有至少一个follower还跟自己保持联系

这两个配置加上上面生产者的配置联合起来用,基本可确保kafka不丢数据。

(3)消费者弄丢数据

这种情况一般是自动提交了offset,然后你处理程序过程中挂了。kafka以为你处理好了。

offset是干嘛的:kafka的topic中的每个消费组消费的下标。就是一条信息对应一个offset下标,每次消费数据的时候如果提交offset,那么下次消费就会从提交的offset+1那里开始消费。比如一个topic中有100条数据,我消费了50条并且提交了,那么此时的kafka服务器记录提交的offset就是49(offset从0开始),那么下次消费的时候offset就从50开始消费。

解决方案:手动提交

ActiveMQ

RocketMQ

待查阅

9.如何保证消息的顺序性?

并非所有的公司都有这种业务需求,但是还是对这个问题要有所复习。

通过某种算法,将需要保持先后顺序的消息放到同于个消息队列中(kafka中就是partition,rabbitmq中就是queue)。然后只用一个消费者去消费该队列。

有人问:那如果为了吞吐量,有多个消费者去消费怎么办?

这个问题,没有固定回答套路。比如我们有一个微博的操作:发微博、写评论、删除微博,三个异步操作。如果是这样一个业务场景,那只要重试就行。比如你一个消费者先执行了写评论的操作,但是微博都还没发,写评论一定是失败的。等一段时间,等另一个消费者,先执行发微博的操作后,再执行,就可以成功。针对这个问题,我的观点是:保证入队有序就行,出队以后的顺序交给消费者自己去保证,没有固定套路。

转载于:https://www.cnblogs.com/shinl00/p/10680657.html

你可能感兴趣的文章
【框架整合】Maven-SpringMVC3.X+Spring3.X+MyBatis3-日志、JSON解析、表关联查询等均已配置好...
查看>>
要想成为高级Java程序员需要具备哪些知识呢?
查看>>
带着问题去学习--Nginx配置解析(一)
查看>>
onix-文件系统
查看>>
java.io.Serializable浅析
查看>>
我的友情链接
查看>>
多线程之线程池任务管理通用模板
查看>>
CSS3让长单词与URL地址自动换行——word-wrap属性
查看>>
CodeForces 580B Kefa and Company
查看>>
开发规范浅谈
查看>>
Spark Streaming揭秘 Day29 深入理解Spark2.x中的Structured Streaming
查看>>
鼠标增强软件StrokeIt使用方法
查看>>
本地连接linux虚拟机的方法
查看>>
某公司面试java试题之【二】,看看吧,说不定就是你将要做的题
查看>>
BABOK - 企业分析(Enterprise Analysis)概要
查看>>
Linux 配置vnc,开启linux远程桌面
查看>>
NLog文章系列——如何优化日志性能
查看>>
Hadoop安装测试简单记录
查看>>
CentOS6.4关闭触控板
查看>>
ThreadPoolExecutor线程池运行机制分析-线程复用原理
查看>>