BFF 的落地实现
1. 前置知识
RPC(Remote Procedure Call)
是远程过程调用的缩写,是一种通信协议,允许程序在不同的计算机上相互调用远程过程,就像调用本地过程一样sofa-rpc-node
:基于nodejs
的一个RPC
框架,支持多种协议Protocol Buffers
(简称protobuf
):Google
开发的一种数据序列化格式,可以将结构化数据序列化成二进制格式,并能够垮语言使用
2. 技术选型
- Koa
- Redis
- MySQL
- Zookeeper
- RabbitMQ
3. 环境安装并启动
- macOS
11.6.4
- 本地
localhost
运行
# 安装
brew install mysql
brew install zookeeper
brew install rabbitmq
# 启动 zookeeper
zkServer start
# 启动 rabbitmq
brew services start rabbitmq
4. 安装 mysql 并创建数据库和表
4.1 创建数据库
CREATE DATABASE bff;
4.2 创建表
-- t_user 表
CREATE TABLE IF NOT EXISTS `t_user`(
`id` INT UNSIGNED AUTO_INCREMENT,
`username` VARCHAR(20) NOT NULL,
`avatar` VARCHAR(100) NOT NULL,
`password` VARCHAR(100) NOT NULL,
`phone` VARCHAR(30) NOT NULL,
PRIMARY KEY ( `id` )
)
-- t_post 表
CREATE TABLE IF NOT EXISTS `t_post`(
`id` INT UNSIGNED AUTO_INCREMENT,
`user_id` INT,
`title` VARCHAR(100) NOT NULL,
PRIMARY KEY ( `id` )
)
4.3 创建模拟数据
INSERT INTO t_user (id, username, avatar, password, phone) VALUES (1, 'lucus', 'avatar1.png', '111111', '13100000001');
INSERT INTO t_user (id, username, avatar, password, phone) VALUES (2, 'mars', 'avatar2.png', '222222', '13100000002');
INSERT INTO t_post (id, user_id, title) VALUES (1, 1, '文章1');
INSERT INTO t_post (id, user_id, title) VALUES (2, 1, '文章2');
INSERT INTO t_post (id, user_id, title) VALUES (3, 1, '文章3');
5. 创建 user 微服务
5.1 安装
mkdir user && npm init -y
npm install mysql2 sofa-rpc-node --save
user/package.json
{
"name": "user",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "nodemon index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"mysql2": "^3.0.0",
"sofa-rpc-node": "^2.8.0"
}
}
5.2 启动 RPC 服务并发布 user 服务
touch user/index.js
# 启动服务
npm run dev
user/index.js
const {
server: { RpcServer }, // 可以使用它创建RPC服务器
registry: { ZookeeperRegistry }, // 可以通过它来创建注册中心
} = require("sofa-rpc-node");
const mysql = require("mysql2/promise");
const logger = console;
// 创建一个注册中心,用于注册微服务
const registry = new ZookeeperRegistry({
// 记录日志用哪个工具
logger,
// zookeeper地址
address: "127.0.0.1:2181",
});
// 创建RPC服务器的实例
// 客户端连接RPC服务器的可以通过zookeeper,也可以直接直连rpcServer
const server = new RpcServer({
logger,
registry,
port: 10000,
});
(async function () {
//连接mysql数据库
const connection = await mysql.createConnection({
host: "localhost",
user: "root",
password: "",
database: "bff",
});
// 添加服务接口
server.addService(
// 格式约定为域名反转+领域模型的名称
{ interfaceName: "com.chemputer.user" }, // 格式约定为域名反转+领域模型的名称
{
async getUserInfo(userId) {
const sql = `SELECT id,username,avatar,password,phone from t_user WHERE id=${userId} limit 1`;
const [rows] = await connection.execute(sql);
return rows[0];
},
}
);
// 启动RPC服务
await server.start();
// 把启动好的RPC服务注册到zookeeper上
await server.publish();
console.log("用户微服务已经启动");
})();
5.3 客户端
touch user/client.js
user/client.js
const {
client: { RpcClient }, // 可以使用它创建RPC服务
registry: { ZookeeperRegistry }, // 可以通过它来创建注册中心
} = require("sofa-rpc-node");
const logger = console;
// 创建一个注册中心,用于注册微服务
const registry = new ZookeeperRegistry({
// 记录日志用哪个工具
logger,
// zookeeper的地址
address: "127.0.0.1:2181",
});
(async function () {
// 创建RPC客户端
const client = new RpcClient({
logger,
registry,
});
// 创建一个RPC服务器的消费者
const consumer = client.createConsumer({
interfaceName: "com.chemputer.user",
});
// 等待服务就绪
await consumer.ready();
const result = await consumer.invoke("getUserInfo", [1]);
console.log(result, "result....");
process.exit(0);
})();
6. 创建 post 微服务(同创建 user 微服务)
6.1 安装
mkdir post && npm init -y
npm install mysql2 sofa-rpc-node --save
post/package.json
{
"name": "post",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "nodemon index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"mysql2": "^3.1.0",
"sofa-rpc-node": "^2.8.0"
}
}
6.2 启动 RPC 服务并发布 post 服务
touch post/index.js
# 启动服务
npm run dev
post/index.js
const {
server: { RpcServer }, // 可以使用它创建RPC服务器
registry: { ZookeeperRegistry }, // 可以通过它来创建注册中心
} = require("sofa-rpc-node");
const mysql = require("mysql2/promise");
const logger = console;
// 创建一个注册中心,用于注册微服务
const registry = new ZookeeperRegistry({
// 记录日志用哪个工具
logger,
// zookeeper地址
address: "127.0.0.1:2181",
});
// 创建RPC服务器的实例
// 客户端连接RPC服务器的可以通过zookeeper,也可以直接直连rpcServer
const server = new RpcServer({
logger,
registry,
port: 10001,
});
(async function () {
//连接mysql数据库
const connection = await mysql.createConnection({
host: "localhost",
user: "root",
password: "",
database: "bff",
});
// 添加服务接口
server.addService(
// 格式约定为域名反转+领域模型的名称
{ interfaceName: "com.chemputer.post" }, // 格式约定为域名反转+领域模型的名称
{
async getPostCount(userId) {
const sql = `SELECT count(*) as postCount from t_post WHERE user_id=${userId}`;
const [rows] = await connection.execute(sql);
return rows[0].postCount;
},
}
);
// 启动RPC服务
await server.start();
// 把启动好的RPC服务注册到zookeeper上
await server.publish();
console.log("文章微服务已经启动");
})();
6.3 客户端
touch post/client.js
post/client.js
const {
client: { RpcClient }, // 可以使用它创建RPC服务
registry: { ZookeeperRegistry }, // 可以通过它来创建注册中心
} = require("sofa-rpc-node");
const logger = console;
// 创建一个注册中心,用于注册微服务
const registry = new ZookeeperRegistry({
// 记录日志用哪个工具
logger,
// zookeeper的地址
address: "127.0.0.1:2181",
});
(async function () {
// 创建RPC客户端
const client = new RpcClient({
logger,
registry,
});
// 创建一个RPC服务器的消费者
const consumer = client.createConsumer({
interfaceName: "com.chemputer.post",
});
// 等待服务就绪
await consumer.ready();
const result = await consumer.invoke("getPostCount", [1]);
console.log(result, "post result....");
process.exit(0);
})();
7. 创建 BFF
7.1 安装
mkdir bff && npm init -y
npm i koa koa-router koa-logger sofa-rpc-node lru-cache ioredis amqplib fs-extra
bff/package.json
{
"name": "bff",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "nodemon index.js",
"start": "pm2 start index.js --name bff"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"amqplib": "^0.10.3",
"fs-extra": "^11.1.0",
"ioredis": "^5.2.4",
"koa": "^2.14.1",
"koa-logger": "^3.2.1",
"koa-router": "^12.0.0",
"lru-cache": "^7.14.1",
"sofa-rpc-node": "^2.8.0"
}
}
7.2 创建 BFF 层 RPC 中间件
touch bff/middleware/rpc.js
bff/middleware/rpc.js
const {
client: { RpcClient }, // 可以使用它创建RPC服务器
registry: { ZookeeperRegistry }, // 可以通过它来创建注册中心
} = require("sofa-rpc-node");
const logger = console;
const rpcMiddleware = (options = {}) => {
return async function (ctx, next) {
// 创建 zookeeperRegistry 类的实例,用于管理服务发现和注册
const registry = new ZookeeperRegistry({
// 记录日志用哪个工具
logger,
// zookeeper 的地址
address: "127.0.0.1:2181",
});
// 创建 RpcClient 类的实例,用于发送 rpc 请求
const client = new RpcClient({
logger,
registry,
});
const interfaceNames = options.interfaceNames || [];
const rpcConsumers = {};
for (let i = 0; i < interfaceNames.length; i++) {
const interfaceName = interfaceNames[i];
// 使用 RpcClient 的 createConsumer 方法创建 rpc 消费者
const consumer = client.createConsumer({ interfaceName });
// 等待 rpc 消费者准备完毕
await consumer.ready();
rpcConsumers[interfaceName.split(".").pop()] = consumer;
}
ctx.rpcConsumers = rpcConsumers;
await next();
};
};
module.exports = rpcMiddleware;
7.3 启动 BFF 服务
touch bff/index.js
# 启动服务
npm run dev
bff/index.js
const Koa = require("koa");
const router = require("koa-router")();
const logger = require("koa-logger");
const rpcMiddleware = require("./middleware/rpc");
const app = new Koa();
app.use(logger());
app.use(
rpcMiddleware({
// 配置 rpc 中间件的参数,表示要调用的 rpc 接口名称
interfaceNames: ["com.chemputer.user", "com.chemputer.post"],
})
);
router.get("/", async (ctx) => {
const userId = ctx.query.userId;
const {
rpcConsumers: { user, post },
} = ctx;
const [userInfo, postCount] = await Promise.all([
user.invoke("getUserInfo", [userId]),
post.invoke("getPostCount", [userId]),
]);
ctx.body = {
userInfo,
postCount,
};
});
app.use(router.routes()).use(router.allowedMethods());
app.listen(3000, () => {
console.log(" 🚀 bff server is running at 3000");
});
浏览器访问
http://localhost:3000/?userId=1
8. 数据处理
对数据进行个性化的 裁剪
脱敏
适配
bff/index.js
...
router.get("/", async (ctx) => {
const userId = ctx.query.userId;
const {
rpcConsumers: { user, post },
} = ctx;
const [userInfo, postCount] = await Promise.all([
user.invoke("getUserInfo", [userId]),
post.invoke("getPostCount", [userId]),
]);
// 数据的裁剪,把不需要的信息和字符裁剪掉,不返回给客户端
delete userInfo.password;
// 数据脱敏
userInfo.phone = userInfo.phone.replace(/(\d{3})\d{4}(\d{4})/, "$1****$2");
// 数据适配
userInfo.avatar = "http://www.chemputer.top/" + userInfo.avatar;
ctx.body = {
userInfo,
postCount,
};
});
...
9. 缓存
- BFF 作为前端应用和后端系统之间的抽象层,承担了大量的请求转发和数据转换工作。使用多级缓存可以帮助 BFF 减少对后端系统的访问,从而提高应用的响应速度
- 当 BFF 收到一个请求时,首先会检查内存缓存中是否存在对应的数据,如果有就直接返回数据。如果内存缓存中没有数据,就会检查 Redis 缓存,如果 Redis 缓存中有数据就返回数据,并将数据写入内存缓存。如果本地缓存中也没有数据,就会向后端系统发起请求,并将数据写入 Redis 缓存和内存缓存
9.1 多级缓存
- 多级缓存(
multi-level cache
)是指系统中使用了多个缓存层来存储数据的技术。这些缓存层的优先级通常是依次递减的,即最快的缓存层位于最顶层,最慢的缓存层位于最底层
9.2 LRU
- LRU(
Least Recently Used
)是一种常用的高速缓存淘汰算法,它的原理是将最近使用过的数据或页面保留在缓存中,而最少使用的数据或页面将被淘汰。这样做的目的是为了最大化缓存的命中率,即使用缓存尽可能多地满足用户的请求
9.3 Redis
Redis
是一种开源的内存数据存储系统,可以作为数据库、缓存和消息中间件使用Redis
运行在内存中,因为它的读写速度非常快ioredis
是一个基于Node.js
的Redis
客户端,提供了对Redis
命令的高度封装和支持
9.4 BFF 使用缓存
touch bff/middleware/cache.js
bff/middleware/cache.js
const Redis = require("ioredis");
const LRUCache = require("lru-cache");
class CacheStore {
constructor() {
this.stores = [];
}
add(store) {
this.stores.push(store);
return this;
}
async set(key, value) {
for (const store of this.stores) {
await store.set(key, value);
}
}
async get(key) {
for (const store of this.stores) {
const value = await store.get(key);
if (value !== undefined) return value;
}
}
}
class MemoryStore {
constructor() {
this.cache = new LRUCache({
max: 100,
//一般来说越上层的过期时间越短
ttl: 1000, //设置过期时间
});
}
async set(key, value) {
this.cache.set(key, value);
}
async get(key) {
return this.cache.get(key);
}
}
class RedisStore {
constructor(options = { host: "localhost", port: "6379" }) {
this.client = new Redis(options);
}
async set(key, value) {
await this.client.set(key, JSON.stringify(value));
//await this.client.pexpire(key, 60);
}
async get(key) {
const value = await this.client.get(key);
return value ? JSON.parse(value) : undefined;
}
}
//创建一个缓存实例
const cacheStore = new CacheStore();
//添一些缓存层
cacheStore.add(new MemoryStore());
cacheStore.add(new RedisStore());
const cacheMiddleware = () => {
return async function (ctx, next) {
ctx.cache = cacheStore;
await next();
};
};
module.exports = cacheMiddleware;
10. 消息队列处理日志
- 消息队列(
Message Queue
)用于在分布式系统中传递数据。它的特点是可以将消息发送者和接收者解耦,使得消息生产者和消息消费者可以独立的开发和部署
10.1 引入原因
- 在 BFF 中使用消息队列(
message queue
)有几个原因大并发
:消息队列可以帮助应对大并发的请求,BFF 可以将请求写入消息队列,然后后端服务可以从消息队列中读取请求并处理解耦
:消息队列可以帮助解耦 BFF 和后端服务,BFF 不需要关心后端服务的具体实现,只需要将请求写入消息队列,后端服务负责从消息队列中读取请求并处理异步
:消息队列可以帮助实现异步调用,BFF 可以将请求写入消息队列,然后立即返回响应给前端应用,后端服务在后台处理请求流量削峰
:消息队列可以帮助流量削峰,BFF 可以将请求写入消息队列,然后后端服务可以在合适的时候处理请求,从而缓解瞬时高峰流量带来的压力
10.2 RabbitMQ
RabbitMQ
是一个消息代理,它可以用来在消息生产者和消息消费者之间传递消息RabbitMQ
的工作流程如下:- 消息生产者将消息发送到
RabbitMQ
服务器 RabbitMQ
服务器将消息保存到队列中- 消息消费者从队列中读取消息
- 当消息消费者处理完消息后
RabbitMQ
服务器将消息删除
- 消息生产者将消息发送到
10.3 BFF 使用 MQ
- 创建 mq 中间件
touch bff/middleware/mq.js
bff/middleware/mq.js
const amqplib = require("amqplib");
const mqMiddleware = (options = {}) => {
return async function (ctx, next) {
//连接 MQ服务器
const mqClient = await amqplib.connect(options.url || "amqp://localhost");
//创建一个通道
const logger = await mqClient.createChannel();
//创建一个名称为logger的队列,如果已经存在,不会重复创建
await logger.assertQueue("logger");
ctx.channels = {
logger,
};
await next();
};
};
module.exports = mqMiddleware;
- 接入到 bff 层
bff/index.js
router.get('/', async (ctx) => {
...
ctx.channels.logger.sendToQueue(
"logger",
Buffer.from(
JSON.stringify({
method: ctx.method,
path: ctx.path,
userId,
})
)
);
...
})
- 启动写入日志服务
mkdir write-logger && npm init -y
npm i amqplib fs-extra
write-logger/package.json
{
"name": "write-logger",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"dev": "nodemon index.js"
},
"keywords": [],
"author": "",
"license": "ISC",
"dependencies": {
"amqplib": "^0.10.3",
"fs-extra": "^11.1.0"
}
}
write-logger/index.js
const amqplib = require("amqplib");
const fs = require("fs-extra");
(async function () {
//连接 MQ服务器
const mqClient = await amqplib.connect("amqp://localhost");
//创建一个通道
const logger = await mqClient.createChannel();
//创建一个名称为logger的队列,如果已经存在,不会重复创建
await logger.assertQueue("logger");
logger.consume("logger", async (event) => {
await fs.appendFile("./logger.txt", event.content.toString() + "\n");
});
})();