每天一个 Node 包之 消息队列 `bull`

Posted by Run-dream Blog on October 15, 2021

bull 是什么

定义

官方文档BullMQ 中的定义:

BullMQ 是一个 Node.js 库,基于 Redis 实现了一套快速且鲁班的队列系统,有助于解决很多现代的微服务架构的问题。

这个库为实现以下目标而设计:

  • 有且仅有一次(Exactly once)的队列语义, 也就是尝试有且一次的传递每条消息,但在最坏的场景下至少被传递一次。
  • 易于水平伸缩。可以为并行处理 Jobs添加更多的进程。
  • 一致性。
  • 高性能。通过结合高效的 .lua 脚本和 pipeline,尝试从 Redis 获得尽可能高的吞吐。

特性

消息队列可以优雅地解决许多不同的问题,从平滑处理峰值到在微服务之间创建健壮的通信通道,或将繁重的工作从一台服务器转移到许多较小的工作人员,以及许多其他用例。

BullMQ 实现了以下特性:

  • 由于无轮询设计,CPU使用率最低
  • 基于Redis的分布式作业执行
  • LIFO 和 FIFO 的作业
  • 优先级
  • 延迟作业
  • 基于 cron 规范的计划和可执行作业
  • 失败作业的重试
  • 每个工作进程的并发设置
  • 线程(沙盒)处理函数
  • 从进程崩溃中自动恢复

基本概念

核心概念

  • Queue

    存储待被处理的作业的列表。也即是消息队列本体。

    const queue = new Queue('Cars');
    await queue.add('paint', { colour: 'red' });
    await queue.add('paint', { colour: 'blue' }, { delay: 5000 });
    
  • Worker

    Worker是基于添or加到队列中的作业执行某些作业的实际实例,相当于传统消息队列中的“消息”接收者(Consumer)。

    import { Worker, Job } from 'bullmq';
      
    const worker = new Worker(queueName, async (job: Job) => {
      // Optionally report some progress
      job.updateProgress(42);
      
      // Optionally sending an object as progress
      job.updateProgress({ foo: 'bar' });
      
      // Do something with job
      return 'some value';
    });
    
  • Job

    作业。支持多种类型的调度。

    • FIFO
    • LIFO
    • Delayed
    • Repeatable
    • Prioritized
    const myQueue = new Queue('Paint');
    // Repeat job once every day at 3:15 (am)
    await myQueue.add(
      'submarine',
      { color: 'yellow' },
      {
        repeat: {
          cron: '* 15 3 * * *',
        },
      },
    );
    
  • Flow

    BullMQ支持作业之间的父子关系。基本思想是,在成功处理其所有子作业之前,父作业不会被移动到等待状态,也就是会按照父作业依次调度。

    import { FlowProducer } from 'bullmq';
      
    // A FlowProducer constructor takes an optional "connection"
    // object otherwise it connects to a local redis instance.
    const flowProducer = new FlowProducer();
      
    const flow = await flowProducer.add({
      name: 'renovate-interior',
      queueName: 'renovate',
      children: [
        { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' },
        { name: 'paint', data: { place: 'walls' }, queueName: 'steps' },
        { name: 'fix', data: { place: 'floor' }, queueName: 'steps' },
      ],
    });
    

辅助概念

  • Connections

    Redis 连接,具体来说使用的是 ioredis

    import IORedis from 'ioredis';
    const connection = new IORedis();
    // Reuse the ioredis instance
    const myQueue = new Queue('myqueue', { connection });
    
  • QueueScheduler

    用于管理给定队列的暂停和延迟作业。

  • QueueEvents

    所有类都会触发有用的事件,通知队列中运行的作业的生命周期,如 Job的completed,progress,failed,error 事件。

bull 的用法

Producer:

import { Queue } from 'bullmq'

const myQueue = new Queue('Paint');

// Add a job that will be processed after all others
await myQueue.add('wall', { color: 'pink' });

Consumer:

myQueue.process(async function (job, done) {
  await someFunc(job);
});

bull 的实现

Job 的生命周期

img

源码实现

目录结构

.
├── index.js													# export Queue 和 Job 类
├── lib																# 源码实现
│   ├── backoffs.js										# export backoff 相关的逻辑 包括 normalize 和 calculate
│   ├── commands											# 交给 Redis 执行的 lua 脚本, 这里为了方便调用 文件命名的格式是 `${jobName}-${paramsLength}.lua`
│   │   ├── addJob-6.lua							
│   │   ├── cleanJobsInSet-3.lua			
│   │   ├── extendLock-2.lua
│   │   ├── index.js									# 通过 ioredis 的 defineCommand 方法来预加载脚本
│   │   ├── isFinished-2.lua
│   │   ├── isJobInList-1.lua
│   │   ├── moveStalledJobsToWait-7.lua
│   │   ├── moveToActive-8.lua
│   │   ├── moveToDelayed-3.lua
│   │   ├── moveToFinished-8.lua
│   │   ├── obliterate-2.lua
│   │   ├── pause-5.lua
│   │   ├── promote-4.lua
│   │   ├── releaseLock-1.lua
│   │   ├── removeJob-10.lua
│   │   ├── removeJobs-8.lua
│   │   ├── removeRepeatable-2.lua
│   │   ├── reprocessJob-4.lua
│   │   ├── retryJob-3.lua
│   │   ├── takeLock-1.lua
│   │   ├── updateDelaySet-6.lua
│   │   └── updateProgress-2.lua
│   ├── errors.js											# 定义了错误相关的常量
│   ├── getters.js										# 在 Queue 上添加了 getXXX 相关的方法
│   ├── job.js												# Job 相关的定义
│   ├── process												# process 相关的处理
│   │   ├── child-pool.js							
│   │   ├── master.js
│   │   ├── sandbox.js
│   │   └── utils.js
│   ├── queue.js											# Queue 相关的定义
│   ├── repeatable.js									# 向 Queue 上添加了 重复任务 相关的方法
│   ├── scripts.js										# Job 相关脚本的调用
│   ├── timer-manager.js							# Timer 定时器相关的处理
│   ├── utils.js											# export errorObject tryCatch isRedisReady
│   └── worker.js											# 向 Queue 上添加 Worker 相关的方法
├── package.json
├── test
│   ├── fixtures											#  Fixture是在软件测试过程中,为测试用例创建其所依赖的前置条件的操作或脚本。
│   │   ├── fixture_processor.js
│   │   ├── fixture_processor_bar.js
│   │   ├── fixture_processor_callback.js
│   │   ├── fixture_processor_callback_fail.js
│   │   ├── fixture_processor_crash.js
│   │   ├── fixture_processor_data.js
│   │   ├── fixture_processor_discard.js
│   │   ├── fixture_processor_exit.js
│   │   ├── fixture_processor_fail.js
│   │   ├── fixture_processor_foo.js
│   │   ├── fixture_processor_progress.js
│   │   └── fixture_processor_slow.js
│   ├── mocha.opts
│   ├── test_child-pool.js
│   ├── test_connection.js
│   ├── test_events.js
│   ├── test_getters.js
│   ├── test_job.js
│   ├── test_obliterate.js
│   ├── test_pause.js
│   ├── test_queue.js
│   ├── test_rate_limiter.js
│   ├── test_repeat.js
│   ├── test_sandboxed_process.js
│   ├── test_when_current_jobs_finished.js
│   ├── test_worker.js
│   └── utils.js


依赖

node packages

other

核心部分源码

  • 调度策略和定时器的实现
  • process

参考资料