AMQP
定义
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。
概念
- 连接(Connection)一个网络连接,比如TCP/IP套接字连接。
- 会话(Session)端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
- 信道(Channel)多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
- 客户端(Client)AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
- 服务器(Server)接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
- 端点(Peer)AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
- 搭档(Partner)当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。
- 片段集(Assembly)段的有序集合,形成一个逻辑工作单元。
- 段(Segment)帧的有序集合,形成片段集中一个完整子单元。
- 帧(Frame)AMQP传输的一个原子单元。一个帧是一个段中的任意分片。
- 控制(Control)单向指令,AMQP规范假设这些指令的传输是不可靠的。
- 命令(Command)需要确认的指令,AMQP规范规定这些指令的传输是可靠的。
- 异常(Exception)在执行一个或者多个命令时可能发生的错误状态。
- 类(Class)一批用来描述某种特定功能的AMQP命令或者控制
- 消息头(Header)描述消息数据属性的一种特殊段。
- 消息体(Body)包含应用程序数据的一种特殊段。消息体段对于服务器来说完全透明——服务器不能查看或者修改消息体。
- 消息内容(Content)包含在消息体段中的的消息数据。
- 交换器(Exchange)服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- 交换器类型(Exchange Type)基于不同路由语义的交换器类。
- 消息队列(Message Queue)一个命名实体,用来保存消息直到发送给消费者。
- 绑定器(Binding)消息队列和交换器之间的关联。
- 绑定器关键字(Binding Key)绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
- 持久存储(Durable)一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
- 临时存储(Transient)一种服务器资源,当服务器重启时,保存的消息数据会丢失。
- 持久化(Persistent)服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
- 非持久化(Non-Persistent)服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
- 消费者(Consumer)一个从消息队列中请求消息的客户端应用程序。
- 生产者(Producer)一个向交换器发布消息的客户端应用程序。
- 虚拟主机(Virtual Host)一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。
基本流程
- 客户端连接到消息队列服务器,打开一个channel。
- 客户端声明一个exchange,并设置相关属性。
- 客户端声明一个queue,并设置相关属性。
- 客户端使用routing key,在exchange和queue之间建立好绑定关系。
- 客户端投递消息到exchange,exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。
基本模型
RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储. RabbitMQ提供了四种Exchange:fanout, direct, topic, header.
- fanout 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。广播,速度最快。
- direct 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
- topic 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
- 符号#匹配一个或多个词
- 符号*匹配不多不少一个词。
- 简单队列
一个生产者对应一个消费者。
- 工作队列
一个生产者对应多个消费者。适用于将耗时的任务分配给多个服务并行去处理。
默认执行Round-robin(轮询)算法来分配任务。
- 发布/订阅
fanout
- RPC
Tips
ack
- 目的 如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
- 机制 ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
- 注意事项 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个”内存泄漏”是致命的。
prefetch
-
定义 消费者消息预读取,用于限制信道或者连接上的未确认消息数量,也可以理解为每次分发给消费者的最大消息数
-
协议和RabbitMq的区别
global的值 prefetch_count 在AMQP 0-9-1的含义 prefetch_count 在RabbitMQ中的含义 false 同一个信道上的消费者共享 单独应用于信道上的每个新消费者 true 所有消费者基于同一个连接共享 同一个信道上的消费者共享
安装(docker-compose)
version: '3.1'
services:
rabbit1:
image: bijukunjummen/rabbitmq-server:3.7.0
hostname: rabbit1
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=myrabbitmq
- RABBITMQ_DEFAULT_PASS=myrabbitmq
rabbit2:
image: bijukunjummen/rabbitmq-server:3.7.0
hostname: rabbit2
links:
- rabbit1
environment:
- CLUSTERED=true
- CLUSTER_WITH=rabbit1
- RAM_NODE=true
ports:
- "5673:5672"
- "15673:15672"
rabbit3:
image: bijukunjummen/rabbitmq-server:3.7.0
hostname: rabbit3
links:
- rabbit1
- rabbit2
environment:
- CLUSTERED=true
- CLUSTER_WITH=rabbit1
ports:
- "5674:5672"
DEMO(js)
Consumer
var amqp = require('amqplib/callback_api');
var args = process.argv.slice(2);
if (args.length == 0) {
console.log("Usage: receive_logs_topic.js <facility>.<severity>");
process.exit(1);
}
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.assertQueue('', {
exclusive: true
}, function(error2, q) {
if (error2) {
throw error2;
}
console.log(' [*] Waiting for logs. To exit press CTRL+C');
args.forEach(function(key) {
channel.bindQueue(q.queue, exchange, key);
});
channel.consume(q.queue, function(msg) {
console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
}, {
noAck: true
});
});
});
});
Producer
var amqp = require('amqplib/callback_api');
var readline = require('readline');
amqp.connect('amqp://localhost', function(error0, connection) {
if (error0) {
throw error0;
}
connection.createChannel(function(error1, channel) {
if (error1) {
throw error1;
}
var exchange = 'topic_logs';
const rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
rl.on('line', line => {
var args = line.split(' ');
var key = (args.length > 0) ? args[0] : 'anonymous.info';
var msg = (args.length > 0) ? args.slice(1).join(' ') : 'Hello World!';
channel.assertExchange(exchange, 'topic', {
durable: false
});
channel.publish(exchange, key, Buffer.from(msg));
console.log(" [x] Sent %s:'%s'", key, msg);
});
});
});
使用场景
-
异步处理
将原来串行的处理中非必须的流程并行化,可以提高处理的时间。
-
应用解耦
引入消息队列解耦的同时,还能在消费者系统出现故障的时候进行对旧数据的恢复。
-
流量削峰
用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.
对比其它MQ的优劣
特性 | ActiveMQ | RabbitMQ | RocketMQ | Kafka |
---|---|---|---|---|
开发语言 | Java | Erlang | Java | Scala |
单机吞吐量 | 万级 | 万级 | 10万级 | 10万级 |
时效性 | ms | us | ms | ms |
可用性 | 主从架构 | 主从架构 | 分布式架构 | 分布式架构 |
功能特性 | 完备 | 完备 高性能 低延时 | 较为完善 | 较为简单 只支持基础的MQ功能 |