• Babel 插件通关秘籍
  • Git 原理详解及实用指南
  • Nest 通关秘籍
  • React 通关秘籍
  • TypeScript 全面进阶指南
  • TypeScript 类型体操通关秘籍
  • 现代CSS
  • Babel 插件通关秘籍
  • Git 原理详解及实用指南
  • Nest 通关秘籍
  • React 通关秘籍
  • TypeScript 全面进阶指南
  • TypeScript 类型体操通关秘籍
  • 现代CSS
  • Nest 通关秘籍

    • 1.开篇词
    • 2.给你 5 个学习 Nest 的理由,你会心动么
    • 3.Nest 基础概念扫盲
    • 4.快速掌握 Nest CLI
    • 5.五种HTTP数据传输方式
    • 6.IoC 解决了什么痛点问题?
    • 7.如何调试 Nest 项目
    • 8.使用多种 Provider,灵活注入对象
    • 9.全局模块和生命周期
    • 10.AOP 架构有什么好处?
    • 11.一网打尽 Nest 全部装饰器
    • 12.Nest 如何自定义装饰器
    • 13.Metadata 和 Reflector
    • 14.ExecutionContext:切换不同上下文
    • 15.Module 和 Provider 的循环依赖怎么处理?
    • 16.如何创建动态模块
    • 17.Nest 和 Express 的关系,如何切到 fastify
    • 18.Nest 的 Middleware
    • 19.RxJS 和 Interceptor
    • 20.内置 Pipe 和自定义 Pipe
    • 21.如何使用 ValidationPipe 验证 post 请求参数
    • 22.如何自定义 Exception Filter
    • 23.图解串一串 Nest 核心概念
    • 24.接口如何实现多版本共存
    • 25.Express 如何使用 multer 实现文件上传
    • 26.Nest 如何使用 multer 实现文件上传
    • 27.图书管理系统:需求分析和原型图
    • 28.图书管理系统:用户模块后端开发
    • 29.图书管理系统:图书模块后端开发
    • 30.图书管理系统:用户模块前端开发
    • 31.图书管理系统:图书模块前端开发--图书搜索
    • 32.图书管理系统:图书模块前端开发--图书增删改
    • 33.图书管理系统:项目总结
    • 34.大文件分片上传
    • 35.最完美的 OSS 上传方案
    • 36.Nest 里如何打印日志?
    • 37.为什么 Node 里要用 Winston 打印日志?
    • 38.Nest 集成日志框架 Winston
    • 39.通过 Desktop 学 Docker 也太简单了
    • 40.你的第一个 Dockerfile
    • 41.Nest 项目如何编写 Dockerfile
    • 42.提升 Dockerfile 水平的 5 个技巧
    • 43.Docker 是怎么实现的?
    • 44.为什么 Node 应用要用 PM2 来跑?
    • 45.快速入门 MySQL
    • 46.SQL 查询语句的所有语法和函数
    • 47.一对一、join 查询、级联方式
    • 48.一对多、多对多关系的表设计
    • 49.子查询和 EXISTS
    • 50.SQL 综合练习
    • 51.MySQL 的事务和隔离级别
    • 52.MySQL 的视图、存储过程和函数
    • 53.使用 Node 操作 MySQL 的两种方式
    • 54.快速掌握 TypeORM
    • 55.TypeORM 一对一的映射和关联 CRUD
    • 56.TypeORM 一对多的映射和关联 CRUD
    • 57.TypeORM 多对多的映射和关联 CRUD
    • 58.在 Nest 里集成 TypeORM
    • 59.TypeORM 如何保存任意层级的关系?
    • 60.为什么生产环境要用 TypeORM 的 migration 迁移功能?
    • 61.Nest 项目里如何使用 TypeORM 迁移
    • 62.如何动态读取不同环境的配置?
    • 63.快速入门 Redis
    • 64.在 Nest 里操作 Redis
    • 65.为什么不用 cache-manager 操作 Redis?
    • 66.两种登录状态保存方式:JWT、Session
    • 67.Nest 里实现 Session 和 JWT
    • 68.MySQL + TypeORM + JWT 实现登录注册
    • 69.基于 ACL 实现权限控制
    • 70.基于 RBAC 实现权限控制
    • 71.基于 access_token 和 refresh_token 实现登录状态无感刷新
    • 72.单 token 无限续期,实现登录状态无感刷新
    • 73.使用 passport 做身份认证
    • 74.passport 实现 GitHub 三方账号登录
    • 75.passport 实现 Google 三方账号登录
    • 76.为什么要使用 Docker Compose ?
    • 77.Docker 容器通信的最简单方式:桥接网络
    • 78.Docker 支持重启策略,是否还需要 PM2
    • 79.快速掌握 Nginx 的 2 大核心用法
    • 80.基于 Nginx 实现灰度系统
    • 81.基于 Redis 实现分布式 session
    • 82.Redis + 高德地图,实现附近的充电宝
    • 83.用 Swagger 自动生成 api 文档
    • 84.如何灵活创建 DTO
    • 85.class-validator 的内置装饰器,如何自定义装饰器
    • 86.序列化 Entity,你不需要 VO 对象
    • 87.手写序列化 Entity 的拦截器
    • 88.使用 compodoc 生成文档
    • 89.Node 如何发邮件?
    • 90.实现基于邮箱验证码的登录
    • 91.定时任务 + Redis 实现阅读量计数
    • 92.Nest 的 3 种定时任务
    • 93.Nest 里如何实现事件通信?
    • 94.HttpModule + pinyin 实现天气预报查询服务
    • 95.如何记录请求日志
    • 96.短链服务?自己写一个
    • 97.Nest 实现 Server Sent Event 数据推送
    • 98.用 minio 自己搭一个 OSS 服务
    • 99.前端如何直传文件到 Minio
    • 100.基于 sharp 实现 gif 压缩工具
    • 101.大文件如何实现流式下载?
    • 102.Puppeteer 实现爬虫,爬取 BOSS 直聘全部前端岗位
    • 103.实现扫二维码登录
    • 104.Nest 的 REPL 模式
    • 105.实现 Excel 导入导出
    • 106.如何用代码动态生成 PPT
    • 107.如何拿到服务器 CPU、内存、磁盘状态
    • 108.Nest 如何实现国际化?
    • 109.会议室预订系统:需求分析和原型图
    • 110.会议室预订系统:技术方案和数据库设计
    • 111.会议室预订系统:用户管理模块-用户注册
    • 112.会议室预订系统:用户管理模块-配置抽离、登录认证鉴权
    • 113.会议室预订系统:用户管理模块-interceptor、修改信息接口
    • 114.会议室预订系统:用户管理模块-用户列表和分页查询
    • 115.会议室预订系统:用户管理模块-swagger 接口文档
    • 116.会议室预订系统:用户管理模块-用户端登录注册页面
    • 117.会议室预订系统:用户管理模块-用户端信息修改页面
    • 118.会议室预订系统:用户管理模块-头像上传
    • 119.会议室预订系统:用户管理模块-管理端用户列表页面
    • 120.会议室预订系统:用户管理模块-管理端信息修改页面
    • 121.会议室预订系统:会议室管理模块-后端开发
    • 122.会议室预订系统:会议室管理模块-管理端前端开发
    • 123.会议室预订系统:会议室管理模块-用户端前端开发
    • 124.会议室预订系统:预定管理模块-后端开发
    • 125.会议室预订系统:预定管理模块-管理端前端开发
    • 126.会议室预订系统:预定管理模块-用户端前端开发
    • 127.会议室预订系统:统计管理模块-后端开发
    • 128.会议室预订系统:统计管理模块-前端开发
    • 129.会议室预订系统:后端项目部署到阿里云
    • 130.会议室预订系统:前端项目部署到阿里云
    • 131.会议室预定系统:用 migration 初始化表和数据
    • 132.会议室预定系统:文件上传 OSS
    • 133.会议室预定系统:Google 账号登录后端开发
    • 134.会议室预定系统:Google 账号登录前端开发
    • 135.会议室预定系统:后端代码优化
    • 136.会议室预定系统:集成日志框架 winston
    • 137.会议室预定系统:前端代码优化
    • 138.会议室预定系统:全部功能测试
    • 139.会议室预定系统:项目总结
    • 140.Nest 如何创建微服务?
    • 141.Nest 的 Monorepo 和 Library
    • 142.用 Etcd 实现微服务配置中心和注册中心
    • 143.Nest 集成 Etcd 做注册中心、配置中心
    • 144.用 Nacos 实现微服务配置中心和注册中心
    • 145.基于 gRPC 实现跨语言的微服务通信
    • 146.快速入门 ORM 框架 Prisma
    • 147.Prisma 的全部命令
    • 148.Prisma 的全部 schema 语法
    • 149.Primsa Client 单表 CRUD 的全部 api
    • 150.Prisma Client 多表 CRUD 的全部 api
    • 151.在 Nest 里集成 Prisma
    • 152.为什么前端监控系统要用 RabbitMQ?
    • 153.基于 Redis 实现关注关系
    • 154.基于 Redis 实现各种排行榜(周榜、月榜、年榜)
    • 155.考试系统:需求分析
    • 156.考试系统:技术方案和数据库设计
    • 157.考试系统:微服务、Lib 拆分
    • 158.考试系统;用户注册
    • 159.考试系统:用户登录、修改密码
    • 160.考试系统:考试微服务
    • 161.考试系统:登录、注册页面
    • 162.考试系统:修改密码、试卷列表页面
    • 163.考试系统:新增试卷、回收站
    • 164.考试系统:试卷编辑器
    • 165.考试系统:试卷回显、预览、保存
    • 166.考试系统:答卷微服务
    • 167.考试系统:答题页面
    • 168.考试系统:自动判卷
    • 169.考试系统:分析微服务、排行榜页面
    • 170.考试系统:整体测试
    • 171.考试系统:项目总结
    • 172.用 Node.js 手写 WebSocket 协议
    • 173.Nest 开发 WebSocket 服务
    • 174.基于 Socket.io 的 room 实现群聊
    • 175.聊天室:需求分析和原型图
    • 176.聊天室:技术选型和数据库设计
    • 177.聊天室:用户注册
    • 178.聊天室:用户登录
    • 179.聊天室:修改密码、修改信息
    • 180.聊天室:好友列表、发送好友申请
    • 181.聊天室:创建聊天室、加入群聊
    • 182.聊天室:登录、注册页面开发
    • 183.聊天室:修改密码、信息页面开发
    • 184.聊天室:头像上传
    • 185.聊天室:好友∕群聊列表页面
    • 186.聊天室:添加好友弹窗、通知页面
    • 187.聊天室:聊天功能后端开发
    • 188.聊天室:聊天功能前端开发
    • 189.聊天室:一对一聊天
    • 190.聊天室:创建群聊、进入群聊
    • 191.聊天室:发送表情、图片、文件
    • 192.聊天室:收藏
    • 193.聊天室:全部功能测试
    • 194.聊天室:项目总结
    • 195.MongoDB 快速入门
    • 196.使用 mongoose 操作 MongoDB 数据库
    • 197.GraphQL 快速入门
    • 198.Nest 开发 GraphQL 服务:实现 CRUD
    • 199.GraphQL + Primsa + React 实现 TodoList
    • 200.如何调试 Nest 源码?

前端监控系统是采集用户端的异常、性能、业务埋点等数据上报,在服务端做存储,并支持可视化分析的平台。

用户量可能很大,采集的数据可能比较多,这时候服务端的并发压力会比较大,要是直接存入数据库,那数据库服务很可能会崩掉。

那就用现在的数据库,如何保证面对大量并发请求的时候,服务不崩呢?

答案就是消息队列,比如常用的 RabbitMQ:

第一个 web 服务接收请求,把消息存入 RabbitMQ,然后另一个 web 服务从 MQ 中取出消息存入数据库。

有同学说,这不是一样么?

不一样,MQ 的并发量比数据库高很多。之前 web 服务要等数据库存储完成才能响应,而现在只存入 MQ 就可以响应了。那可以支持的并发量就更多。

而数据库的并发比较低,我们可以通过 MQ 把消费的上限调低,就能保证数据库服务不崩。

比如 10w 的消息进来,每次只从中取出 1000 来消费:

并发量被控制住了,自然就崩不了了,从 MQ 中取出慢慢处理就好了。

这就是 MQ 的流量削峰的功能。

而且完全可以加几个 web 服务来同时消费 MQ 中的消息:

知道了 RabbitMQ 能干啥,那我们就来用一下试试吧!

我们通过 docker 来跑 RabbitMQ。

搜索 rabbitmq 的镜像,选择 3.11-management 的版本:

这个版本是有 web 管理界面的。

点击 run:

映射容器内的 5672、15672 这俩端口到本地的端口。

15672 是管理界面的,5672 是 mq 服务的端口。

等 rabbitmq 跑起来之后:

就可以在浏览器访问 http://localhost:15672 了:

这就是它的 web 管理界面。

输入 guest、guest 进入管理页面:

可以看到 connection、channel、exchange、queue 的分别的管理页面。

这些都是什么呢?

写个 demo 就理解了:

创建个项目:

mkdir rabbitmq-test

cd rabbitmq-test

npm init -y

安装用到的包:

npm install amqplib

创建 src/producer.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue("aaa");
await channel.sendToQueue("aaa", Buffer.from("hello"));

安装 amqplib 的包,这个是 rabbitmq 的 node 客户端(amqp 是 rabbitmq 的协议)。

上面的代码连接了 rabbitmq 服务,创建了一个名字为 aaa 的队列,并向队列中发送了一个消息。

然后 node 跑一下:

node ./src/producer.js

(这里要用 es module 语法并且支持顶层 await 需要在 packege.json 里设置 type 为 module)

之后就可以在管理界面看到这个队列了:

然后我们再写一个消费端 src/consumer.js:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("aaa");
channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

assertQueue 是如果没有就创建队列,有的话就直接返回。

这里取到那个队列,就可以从中消费消息了:

node src/consumer.js

这样,我们就完成了第一次 RabbitMQ 的通信,两个服务之间也是这样通信的。

是不是还挺简单的?

rabbitmq 使用确实挺简单。

那怎么控制并发数呢?

我们改一下 src/producer.js:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertQueue("aaa", { durable: false });

let i = 1;
setInterval(async () => {
    const msg = "hello" + i;
    console.log("发送消息:", msg);
    await channel.sendToQueue("aaa", Buffer.from(msg));
    i++;
}, 500);

生产者每 0.5s 发送一次消息。

消费者每 1s 处理一条消息:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("aaa");
channel.prefetch(3);

const currentTask = [];
channel.consume(
    queue,
    (msg) => {
        currentTask.push(msg);
        console.log("收到消息:", msg.content.toString());
    },
    { noAck: false }
);

setInterval(() => {
    const curMsg = currentTask.pop();
    channel.ack(curMsg);
}, 1000);

每条消费者收到的消息要确认之后才会在 MQ 里删除。可以收到消息自动确认,也可以手动确认。

这里我把 noAck 设置为 false 了,也就是不自动确认。

把收到的消息放入一个数组中,每 1s 确认一次。

然后我设置了 prefetch 为 3,也就是每次最多取回 3 条消息来处理。

跑一下试试:

消息生产端:

node ./src/producer.js

消息消费端:

node ./src/consumer.js

可以看到生产者是每 0.5s 往队列里放一条消息。

消费者一开始取出 3 条,然后每处理完一条取一条,保证最多并发处理 3 条。

这就是流量削峰的功能。

不同服务之间的速度差异可以通过 MQ 来缓冲。

大概了解了 rabbitmq 之后,我们来看看它的整体架构图:

Producer 和 Consumer 分别是生产者和消费者。

Connection 是连接,但我们不会每用一次 rabbitmq 就创建一个单独的 Connection,而是在一个 Connection 里做一下划分,叫做 Channel,每个 Channel 做自己的事情。

而 Queue 就是两端存取消息的地方了。

整个接收消息和转发消息的服务就叫做 Broker。

至于 Exchange,我们前面的例子没有用到,这个是把消息放到不同的队列里用的,叫做交换机。

我们前面生产者和消费者都是直接指定了从哪个队列存取消息,那如果是一对多的场景呢?

总不能一个个的调用 sendQueue 发消息吧?

这时候就要找一个 Exchange(交换机) 来帮我们完成把消息按照规则放入不同的 Queue 的工作了。

Exchange 主要有 4 种:

  • fanout:把消息放到这个交换机的所有 Queue
  • direct:把消息放到交换机的指定 key 的队列
  • topic:把消息放到交换机的指定 key 的队列,支持模糊匹配
  • headers:把消息放到交换机的满足某些 header 的队列

一个个来试下:

首先是 direct,生产者端 src/direct.js:

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange", "direct");

channel.publish("direct-test-exchange", "aaa", Buffer.from("hello1"));
channel.publish("direct-test-exchange", "bbb", Buffer.from("hello2"));
channel.publish("direct-test-exchange", "ccc", Buffer.from("hello3"));

不再是直接 sendToQueue 了,而是创建一个 exchange,然后调用 publish 往这个 exchange 发消息。

其中第二个参数是 routing key,也就是消息路由到哪个队列。

然后创建两个消费者:

src/direct-consumer1.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange", "aaa");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

src/direct-consumer2.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange", "bbb");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

分别创建 queue1 和 queue2 两个队列,绑定到前面创建的 direct-test-exchange 这个交换机上,指定了路由 key 分别是 aaa 和 bbb。

然后把生产者和两个消费者跑起来。

node src/direct.js
node src/direct-consumer1.js
node src/direct-consumer2.js

就可以看到队列 queue1 和 queue2 分别接收到了对应的消息:

这就是通过 direct 交换机发送消息的过程。

在管理页面上也可以看到这个交换机的信息:

包括 exchange 下的两个 queue 以及各自的 routing key。

再来试下 topic 类型的 Exchange。

src/topic.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

channel.publish("direct-test-exchange2", "aaa.1", Buffer.from("hello1"));
channel.publish("direct-test-exchange2", "aaa.2", Buffer.from("hello2"));
channel.publish("direct-test-exchange2", "bbb.1", Buffer.from("hello3"));

生产者端创建叫 direct-test-exchange2 的 topic 类型的 Exchange,然后发三条消息。

创建两个消费端:

src/topic-consumer1.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange2", "aaa.*");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

src/topic-consumer2.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange2", "topic");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange2", "bbb.*");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

两个消费者端分别创建 queue1 和 queue2 两个队列,绑定到 direct-test-exchange2 的交换机下。

指定路由 key 分别为 aaa._ 和 bbb._,这里的 * 是模糊匹配的意思。

消费者端也 assertExchange 了,如果不存在就创建,保证 exchange 一定存在。

然后跑一下:

node src/topic.js
node src/topic-consumer1.js
node src/topic-consumer2.js

可以看到,两个消费者分别收到了不同 routing key 对应的消息。

当然,在管理界面这里也是可以发消息的:

消费者端同样可以收到:

这就是 topic 类型的交换机,可以根据模糊匹配 routing key 来发消息到不同队列。

再来试下 fanout 类型的 exchange:

生产者:

src/fanout.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

channel.publish("direct-test-exchange3", "", Buffer.from("hello1"));
channel.publish("direct-test-exchange3", "", Buffer.from("hello2"));
channel.publish("direct-test-exchange3", "", Buffer.from("hello3"));

消费者:

src/fanout-consumer1.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange3", "aaa");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

src/fanout-consumer2.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange3", "fanout");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange3", "bbb");

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

fanout 是广播消息到 Exchange 下的所有队列,不需要指定 routing key,计算指定了也会忽略。

跑起来可以看到,两个消费者都收到了消息:

node src/fanout.js
node src/fanout-consumer1.js
node src/fanout-consumer2.js

这就是 fanout 类型交换机的特点,广播消息到所有绑定到它的 queue。

最后再来看下 headers 类型的 Exchange,这个不是根据 routing key 来匹配了,而是根据 headers:

生产者端:

src/headers.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

channel.publish("direct-test-exchange4", "", Buffer.from("hello1"), {
    headers: {
        name: "guang",
    },
});
channel.publish("direct-test-exchange4", "", Buffer.from("hello2"), {
    headers: {
        name: "guang",
    },
});
channel.publish("direct-test-exchange4", "", Buffer.from("hello3"), {
    headers: {
        name: "dong",
    },
});

消费者端:

src/headers-consumer1.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

const { queue } = await channel.assertQueue("queue1");
await channel.bindQueue(queue, "direct-test-exchange4", "", {
    name: "guang",
});

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

src/headers-consumer2.js

import * as amqp from "amqplib";

const connect = await amqp.connect(`amqp://localhost:5672`);
const channel = await connect.createChannel();

await channel.assertExchange("direct-test-exchange4", "headers");

const { queue } = await channel.assertQueue("queue2");
await channel.bindQueue(queue, "direct-test-exchange4", "", {
    name: "dong",
});

channel.consume(
    queue,
    (msg) => {
        console.log(msg.content.toString());
    },
    { noAck: true }
);

跑起来是这样的:

node src/headers.js
node src/headers-consumer1.js
node src/headers-consumer2.js

很容易理解,只是从匹配 routing key 变成了匹配 header。

这就是 Exchange,当你需要一对多发消息的时候,就可以选择这些类型的交换机。

回过头来,我们来总结下 rabbitmq 解决了什么问题:

  • 流量削峰:可以把很大的流量放到 mq 种按照一定的流量上限来慢慢消费,这样虽然慢一点,但不至于崩溃。
  • 应用解耦:应用之间不再直接依赖,就算某个应用挂掉了,也可以再恢复后继续从 mq 中消费消息。并不会一个应用挂掉了,它关联的应用也挂掉。

比如前端监控系统的后端服务,就很适合使用 mq 来做流量削峰。

案例代码在小册仓库

总结

前端监控系统会收到很多来自用户端的请求,如果直接存入数据库很容易把数据库服务搞挂掉,所以一般会加一个 RabbitMQ 来缓冲。

它是生产者往 queue 里放入消息,消费者从里面读消息,之后确认消息收到的流程。

当一对多的时候,还要加一个 Exchange 交换机来根据不同的规则转发消息:

  • direct 交换机:根据 routing key 转发消息到队列
  • topic 交换机:根据 routing key 转发消息到队列,支持模糊匹配
  • headers 交换机:根据 headers 转发消息到队列
  • fanout 交换机:广播消息到交换机下的所有队列

而且消费者可以设置一个消费的并发上限,这样就可以保证服务不会因并发过高而崩溃。

这就是流量削峰的功能。

RabbitMQ 在后端系统中经常能见到,是很常用的中间件。

上次更新: 6/21/25, 9:42 AM
贡献者: YNight
Prev
151.在 Nest 里集成 Prisma
Next
153.基于 Redis 实现关注关系