Rabbitmq

Posted by Run-dream Blog on April 25, 2020

AMQP

定义

AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。Erlang中的实现有RabbitMQ等。

概念

  • 连接(Connection)一个网络连接,比如TCP/IP套接字连接。
  • 会话(Session)端点之间的命名对话。在一个会话上下文中,保证“恰好传递一次”。
  • 信道(Channel)多路复用连接中的一条独立的双向数据流通道。为会话提供物理传输介质。
  • 客户端(Client)AMQP连接或者会话的发起者。AMQP是非对称的,客户端生产和消费消息,服务器存储和路由这些消息。
  • 服务器(Server)接受客户端连接,实现AMQP消息队列和路由功能的进程。也称为“消息代理”。
  • 端点(Peer)AMQP对话的任意一方。一个AMQP连接包括两个端点(一个是客户端,一个是服务器)。
  • 搭档(Partner)当描述两个端点之间的交互过程时,使用术语“搭档”来表示“另一个”端点的简记法。
  • 片段集(Assembly)段的有序集合,形成一个逻辑工作单元。
  • 段(Segment)帧的有序集合,形成片段集中一个完整子单元。
  • 帧(Frame)AMQP传输的一个原子单元。一个帧是一个段中的任意分片。
  • 控制(Control)单向指令,AMQP规范假设这些指令的传输是不可靠的。
  • 命令(Command)需要确认的指令,AMQP规范规定这些指令的传输是可靠的。
  • 异常(Exception)在执行一个或者多个命令时可能发生的错误状态。
  • 类(Class)一批用来描述某种特定功能的AMQP命令或者控制
  • 消息头(Header)描述消息数据属性的一种特殊段。
  • 消息体(Body)包含应用程序数据的一种特殊段。消息体段对于服务器来说完全透明——服务器不能查看或者修改消息体。
  • 消息内容(Content)包含在消息体段中的的消息数据。
  • 交换器(Exchange)服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • 交换器类型(Exchange Type)基于不同路由语义的交换器类。
  • 消息队列(Message Queue)一个命名实体,用来保存消息直到发送给消费者。
  • 绑定器(Binding)消息队列和交换器之间的关联。
  • 绑定器关键字(Binding Key)绑定的名称。一些交换器类型可能使用这个名称作为定义绑定器路由行为的模式。
  • 持久存储(Durable)一种服务器资源,当服务器重启时,保存的消息数据不会丢失。
  • 临时存储(Transient)一种服务器资源,当服务器重启时,保存的消息数据会丢失。
  • 持久化(Persistent)服务器将消息保存在可靠磁盘存储中,当服务器重启时,消息不会丢失。
  • 非持久化(Non-Persistent)服务器将消息保存在内存中,当服务器重启时,消息可能丢失。
  • 消费者(Consumer)一个从消息队列中请求消息的客户端应用程序。
  • 生产者(Producer)一个向交换器发布消息的客户端应用程序。
  • 虚拟主机(Virtual Host)一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登录到服务器之后,可以选择一个虚拟主机。

基本流程

  1. 客户端连接到消息队列服务器,打开一个channel。
  2. 客户端声明一个exchange,并设置相关属性。
  3. 客户端声明一个queue,并设置相关属性。
  4. 客户端使用routing key,在exchange和queue之间建立好绑定关系。
  5. 客户端投递消息到exchange,exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

基本模型

RabbitMQ中,所有生产者提交的消息都由Exchange来接受,然后Exchange按照特定的策略转发到Queue进行存储. RabbitMQ提供了四种Exchange:fanout, direct, topic, header.

  • fanout 任何发送到Fanout Exchange的消息都会被转发到与该Exchange绑定(Binding)的所有Queue上。广播,速度最快。
  • direct 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。
  • topic 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。
    • 符号#匹配一个或多个词
    • 符号*匹配不多不少一个词。
  1. 简单队列 一个生产者对应一个消费者。 Simple Queue
  2. 工作队列 一个生产者对应多个消费者。适用于将耗时的任务分配给多个服务并行去处理。 默认执行Round-robin(轮询)算法来分配任务。 Work Queues
  3. 发布/订阅 fanout Publish/Subscribe
  4. RPC img

Tips

ack

  • 目的 如果在处理消息的过程中,消费者的服务器在处理消息的时候出现异常,那么可能这条正在处理的消息就没有完成消息消费,数据就会丢失。为了确保数据不会丢失,RabbitMQ支持消息确定-ACK。
  • 机制 ACK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
  • 注意事项 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中。如果忘记了ACK,那么后果很严重。当Consumer退出时候,Message会一直重新分发。然后RabbitMQ会占用越来越多的内存,由于RabbitMQ会长时间运行,因此这个”内存泄漏”是致命的。

prefetch

  • 定义 消费者消息预读取,用于限制信道或者连接上的未确认消息数量,也可以理解为每次分发给消费者的最大消息数

  • 协议和RabbitMq的区别

    global的值 prefetch_count 在AMQP 0-9-1的含义 prefetch_count 在RabbitMQ中的含义
    false 同一个信道上的消费者共享 单独应用于信道上的每个新消费者
    true 所有消费者基于同一个连接共享 同一个信道上的消费者共享

安装(docker-compose)


version: '3.1'
 
services:
  rabbit1:
    image: bijukunjummen/rabbitmq-server:3.7.0
    hostname: rabbit1
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=myrabbitmq
      - RABBITMQ_DEFAULT_PASS=myrabbitmq
 
  rabbit2:
    image: bijukunjummen/rabbitmq-server:3.7.0
    hostname: rabbit2
    links:
       - rabbit1
    environment:
       - CLUSTERED=true
       - CLUSTER_WITH=rabbit1
       - RAM_NODE=true
    ports:
        - "5673:5672"
        - "15673:15672"
 
  rabbit3:
    image: bijukunjummen/rabbitmq-server:3.7.0
    hostname: rabbit3
    links:
        - rabbit1
        - rabbit2
    environment:
        - CLUSTERED=true
        - CLUSTER_WITH=rabbit1
    ports:
        - "5674:5672"

DEMO(js)

Consumer


var amqp = require('amqplib/callback_api');

var args = process.argv.slice(2);

if (args.length == 0) {
  console.log("Usage: receive_logs_topic.js <facility>.<severity>");
  process.exit(1);
}

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'topic_logs';

    channel.assertExchange(exchange, 'topic', {
      durable: false
    });

    channel.assertQueue('', {
      exclusive: true
    }, function(error2, q) {
      if (error2) {
        throw error2;
      }
      console.log(' [*] Waiting for logs. To exit press CTRL+C');

      args.forEach(function(key) {
        channel.bindQueue(q.queue, exchange, key);
      });

      channel.consume(q.queue, function(msg) {
        console.log(" [x] %s:'%s'", msg.fields.routingKey, msg.content.toString());
      }, {
        noAck: true
      });
    });
  });
});

Producer


var amqp = require('amqplib/callback_api');
var readline = require('readline');

amqp.connect('amqp://localhost', function(error0, connection) {
  if (error0) {
    throw error0;
  }
  connection.createChannel(function(error1, channel) {
    if (error1) {
      throw error1;
    }
    var exchange = 'topic_logs';
  
    const rl = readline.createInterface({
        input: process.stdin,
        output: process.stdout
    });

    rl.on('line', line => {
        var args = line.split(' ');
        var key = (args.length > 0) ? args[0] : 'anonymous.info';
        var msg = (args.length > 0) ? args.slice(1).join(' ') : 'Hello World!';
    
        channel.assertExchange(exchange, 'topic', {
          durable: false
        });
        channel.publish(exchange, key, Buffer.from(msg));
        console.log(" [x] Sent %s:'%s'", key, msg);
    });
  });
});

使用场景

  • 异步处理

    将原来串行的处理中非必须的流程并行化,可以提高处理的时间。

    img

  • 应用解耦

    引入消息队列解耦的同时,还能在消费者系统出现故障的时候进行对旧数据的恢复。

    img

  • 流量削峰

    用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面.

    img

对比其它MQ的优劣

特性 ActiveMQ RabbitMQ RocketMQ Kafka
开发语言 Java Erlang Java Scala
单机吞吐量 万级 万级 10万级 10万级
时效性 ms us ms ms
可用性 主从架构 主从架构 分布式架构 分布式架构
功能特性 完备 完备 高性能 低延时 较为完善 较为简单 只支持基础的MQ功能

参考

RabbitMQ的应用场景以及基本原理介绍

RabbitMQ

使用 docker-compose 部署多机 RabbitMQ 集群

docker-compose安装rabbitmq集群实战版

Kafka、ActiveMQ、RabbitMQ、RocketMQ 有什么优缺点