bull 是什么
定义
官方文档BullMQ 中的定义:
BullMQ 是一个 Node.js 库,基于 Redis 实现了一套快速且鲁班的队列系统,有助于解决很多现代的微服务架构的问题。
这个库为实现以下目标而设计:
- 有且仅有一次(
Exactly once
)的队列语义, 也就是尝试有且一次的传递每条消息,但在最坏的场景下至少被传递一次。- 易于水平伸缩。可以为并行处理 Jobs添加更多的进程。
- 一致性。
- 高性能。通过结合高效的 .lua 脚本和 pipeline,尝试从 Redis 获得尽可能高的吞吐。
特性
消息队列可以优雅地解决许多不同的问题,从平滑处理峰值到在微服务之间创建健壮的通信通道,或将繁重的工作从一台服务器转移到许多较小的工作人员,以及许多其他用例。
BullMQ 实现了以下特性:
- 由于无轮询设计,CPU使用率最低
- 基于Redis的分布式作业执行
- LIFO 和 FIFO 的作业
- 优先级
- 延迟作业
- 基于 cron 规范的计划和可执行作业
- 失败作业的重试
- 每个工作进程的并发设置
- 线程(沙盒)处理函数
- 从进程崩溃中自动恢复
基本概念
核心概念
-
Queue
存储待被处理的作业的列表。也即是消息队列本体。
const queue = new Queue('Cars'); await queue.add('paint', { colour: 'red' }); await queue.add('paint', { colour: 'blue' }, { delay: 5000 });
-
Worker
Worker是基于添or加到队列中的作业执行某些作业的实际实例,相当于传统消息队列中的“消息”接收者(Consumer)。
import { Worker, Job } from 'bullmq'; const worker = new Worker(queueName, async (job: Job) => { // Optionally report some progress job.updateProgress(42); // Optionally sending an object as progress job.updateProgress({ foo: 'bar' }); // Do something with job return 'some value'; });
-
Job
作业。支持多种类型的调度。
- FIFO
- LIFO
- Delayed
- Repeatable
- Prioritized
const myQueue = new Queue('Paint'); // Repeat job once every day at 3:15 (am) await myQueue.add( 'submarine', { color: 'yellow' }, { repeat: { cron: '* 15 3 * * *', }, }, );
-
Flow
BullMQ支持作业之间的父子关系。基本思想是,在成功处理其所有子作业之前,父作业不会被移动到等待状态,也就是会按照父作业依次调度。
import { FlowProducer } from 'bullmq'; // A FlowProducer constructor takes an optional "connection" // object otherwise it connects to a local redis instance. const flowProducer = new FlowProducer(); const flow = await flowProducer.add({ name: 'renovate-interior', queueName: 'renovate', children: [ { name: 'paint', data: { place: 'ceiling' }, queueName: 'steps' }, { name: 'paint', data: { place: 'walls' }, queueName: 'steps' }, { name: 'fix', data: { place: 'floor' }, queueName: 'steps' }, ], });
辅助概念
-
Connections
Redis 连接,具体来说使用的是 ioredis
import IORedis from 'ioredis'; const connection = new IORedis(); // Reuse the ioredis instance const myQueue = new Queue('myqueue', { connection });
-
QueueScheduler
用于管理给定队列的暂停和延迟作业。
-
QueueEvents
所有类都会触发有用的事件,通知队列中运行的作业的生命周期,如 Job的
completed
,progress
,failed
,error
事件。
bull 的用法
Producer:
import { Queue } from 'bullmq'
const myQueue = new Queue('Paint');
// Add a job that will be processed after all others
await myQueue.add('wall', { color: 'pink' });
Consumer:
myQueue.process(async function (job, done) {
await someFunc(job);
});
bull 的实现
Job 的生命周期
源码实现
目录结构
.
├── index.js # export Queue 和 Job 类
├── lib # 源码实现
│ ├── backoffs.js # export backoff 相关的逻辑 包括 normalize 和 calculate
│ ├── commands # 交给 Redis 执行的 lua 脚本, 这里为了方便调用 文件命名的格式是 `${jobName}-${paramsLength}.lua`
│ │ ├── addJob-6.lua
│ │ ├── cleanJobsInSet-3.lua
│ │ ├── extendLock-2.lua
│ │ ├── index.js # 通过 ioredis 的 defineCommand 方法来预加载脚本
│ │ ├── isFinished-2.lua
│ │ ├── isJobInList-1.lua
│ │ ├── moveStalledJobsToWait-7.lua
│ │ ├── moveToActive-8.lua
│ │ ├── moveToDelayed-3.lua
│ │ ├── moveToFinished-8.lua
│ │ ├── obliterate-2.lua
│ │ ├── pause-5.lua
│ │ ├── promote-4.lua
│ │ ├── releaseLock-1.lua
│ │ ├── removeJob-10.lua
│ │ ├── removeJobs-8.lua
│ │ ├── removeRepeatable-2.lua
│ │ ├── reprocessJob-4.lua
│ │ ├── retryJob-3.lua
│ │ ├── takeLock-1.lua
│ │ ├── updateDelaySet-6.lua
│ │ └── updateProgress-2.lua
│ ├── errors.js # 定义了错误相关的常量
│ ├── getters.js # 在 Queue 上添加了 getXXX 相关的方法
│ ├── job.js # Job 相关的定义
│ ├── process # process 相关的处理
│ │ ├── child-pool.js
│ │ ├── master.js
│ │ ├── sandbox.js
│ │ └── utils.js
│ ├── queue.js # Queue 相关的定义
│ ├── repeatable.js # 向 Queue 上添加了 重复任务 相关的方法
│ ├── scripts.js # Job 相关脚本的调用
│ ├── timer-manager.js # Timer 定时器相关的处理
│ ├── utils.js # export errorObject tryCatch isRedisReady
│ └── worker.js # 向 Queue 上添加 Worker 相关的方法
├── package.json
├── test
│ ├── fixtures # Fixture是在软件测试过程中,为测试用例创建其所依赖的前置条件的操作或脚本。
│ │ ├── fixture_processor.js
│ │ ├── fixture_processor_bar.js
│ │ ├── fixture_processor_callback.js
│ │ ├── fixture_processor_callback_fail.js
│ │ ├── fixture_processor_crash.js
│ │ ├── fixture_processor_data.js
│ │ ├── fixture_processor_discard.js
│ │ ├── fixture_processor_exit.js
│ │ ├── fixture_processor_fail.js
│ │ ├── fixture_processor_foo.js
│ │ ├── fixture_processor_progress.js
│ │ └── fixture_processor_slow.js
│ ├── mocha.opts
│ ├── test_child-pool.js
│ ├── test_connection.js
│ ├── test_events.js
│ ├── test_getters.js
│ ├── test_job.js
│ ├── test_obliterate.js
│ ├── test_pause.js
│ ├── test_queue.js
│ ├── test_rate_limiter.js
│ ├── test_repeat.js
│ ├── test_sandboxed_process.js
│ ├── test_when_current_jobs_finished.js
│ ├── test_worker.js
│ └── utils.js
依赖
node packages
other
核心部分源码
- 调度策略和定时器的实现
- process