RAG 知识库系统
从设计到实现全解析
基于 NestJS + SQLite + 硅基流动 Embedding 构建的低成本语义知识库系统。支持私有文档上传、混合检索(向量 + FTS5)、RRF 融合排序,让 AI 能够基于你的私有文档精准回答问题。
什么是 RAG?
RAG(Retrieval-Augmented Generation)是一种让 AI 大模型能够访问外部知识的技术。它先从知识库中检索与问题相关的片段,再将这些片段作为上下文注入 AI 对话,从而让模型回答超出训练数据范围的私有或实时信息。
AI 只能回答训练数据中包含的内容,对于你公司的内部文档、产品手册、最新政策一无所知,容易出现"幻觉"。
AI 先从你的文档库中检索相关段落,再结合这些"参考资料"回答问题,准确率大幅提升,且有据可查。
RAG 工作流程
用户上传 .txt / .md / .docx 文档,系统自动解析文本 → 按段落分块(600字/块)→ 调用 Embedding API 生成 1024 维向量 → 写入 SQLite(向量存储 + FTS5 全文索引)
用户在 AI 对话框中通过 KbPicker 选择一个或多个知识库,发送消息时前端会携带 kb_ids 参数
后端同时执行向量语义检索(余弦相似度)和 FTS5 关键词检索,用 RRF 算法融合两路排名,取 Top-5 最相关分块
将检索到的文档片段拼接成"参考资料"注入 System Prompt,AI 基于这些资料生成有据可查的回答,并通过 SSE 推送来源文件名到前端气泡
开发路径与学习地图
本章节带你理清整个 RAG 知识库系统的来龙去脉——为什么这样设计、技术进化的历程,以及建议的学习顺序,帮助你快速建立全局认知。
AI 大模型训练数据是有截止日期的,无法回答公司内部文档、产品手册、实时政策等私有信息。直接问大模型容易「幻觉」,出现虚假回答。我们需要一种方法让 AI 能够读懂我们自己的文档。
将私有文档切割成小块→向量化存入数据库→用户提问时实时检索相关内容→注入 AI 对话。这样 AI 回答时就有了“记忋”,不会凭空捏造。而且全部基于现有 SQLite 设施,零额外成本。
🛠 三阶段开发历程
实现文件接受接口,支持 TXT / Markdown / DOCX 三种格式。文件上传后将内容存入 knowledge_chunks 表,为后续向量化打基础。
调用硅基流动 BAAI/bge-m3 生成 1024 维向量,存入 SQLite。检索时将用户问题同样向量化,在所有分块中计算余弦相似度找到 Top-K。
引入 SQLite FTS5 全文检索,将关键词匹配与语义匹配结果通过 RRF 算法融合排序,将检索内容注入 ChatService,并通过 SSE 实时推送来源标注。
📚 推荐学习顺序
先搞清楚「什么是 RAG」、「为什么需要向量检索」、「FTS5 与向量如何互补」,再开始看代码。
看清楚 5 张数据库表的关系,了解 knowledge_chunks 如何同时支持 FTS5 和向量检索。
POST /knowledge/:id/upload → FileParserService → chunkText() → 存入 chunks 表,一步一步跟踪。
重点看 getEmbedding()、cosineSimilarity()、rrfMerge() 三个函数,这是整个系统的智力核心。
看 ChatService 如何注入 RAG 上下文,以及前端 KnowledgePage 和 KbPicker 如何发起知识库对话。
🗺 代码阅读地图
系统整体架构
RAG 系统由前端两个入口页面、后端 KnowledgeModule + ChatModule 两个核心模块、以及扩展了向量/FTS5 能力的 SQLite 数据库组成。
完整系统架构图
/api/knowledge/*
检索 · 预览
分块 600字+80重叠
BAAI/bge-m3
② 调用 retrieve()
③ 注入 System Prompt
④ 推送 rag_sources
⑤ 流式输出
⑥ 持久化来源
.retrieve()
1024维向量存储
unicode61 中文检索
KnowledgePage 管理知识库,KbPicker 在对话中选择知识库,ChatMessages 展示 RAG 来源标签。
KnowledgePage KbPicker 来源标签KnowledgeService 负责 CRUD、索引、检索、预览。ChunkerService 分块,EmbeddingService 向量化。
KnowledgeService ChunkerService EmbeddingService知识库/文档/分块 3 张核心表,kb_chunks_fts FTS5 虚表同步写入,messages 表扩展 rag_sources 列。
kb_chunks FTS5 虚表 向量 JSON文档索引完整链路
POST /api/knowledge/bases/:kbId/documents/upload
│
▼ ① 写入 kb_documents(status=pending)立即返回
▼ ② 异步 processDocument() 开始后台处理
│
├─ status → 'indexing'
├─ chunker.parseFile(buffer, filename) → 纯文本
├─ UPDATE parsed_content → 供预览,限 500KB
├─ chunker.chunkText(text) → chunks[] 600字/块,80字重叠
├─ embedding.embedTexts(chunks) → vectors[][] 调用硅基流动 API
│
└─ SQLite 事务 {
INSERT kb_chunks × N (含 embedding JSON)
INSERT kb_chunks_fts × N (FTS5 同步)
UPDATE kb_documents status='ready'
UPDATE knowledge_bases chunk_count+=N
}
│
▼ 前端轮询 GET …/documents → status='ready'
RAG 对话检索链路
用户发送消息(已选知识库 kb_ids)
│ POST /api/chat/completions { message, kb_ids: [1,2] }
▼
ChatController
│
├─ KnowledgeService.retrieve(message, kb_ids, 5)
│ ├─ EmbeddingService.embedQuery(message) → queryVec
│ ├─ 向量检索:余弦相似度 Top-10
│ ├─ FTS5 检索:BM25 Top-10
│ └─ RRF 融合 → Top-5 chunks
│
├─ 注入 RAG System Prompt("## 参考资料\n【1】...")
│
├─ SSE: event: rag_sources (推送来源文件名,前端立即显示标签)
│
├─ AiService.streamChat() → SSE: event: chunk × N
│
└─ onDone: addMessage(content, { rag_sources: JSON })
→ messages.rag_sources 持久化
数据库设计
RAG 系统在现有 SQLite 数据库上新增 3 张表,并为 messages 表添加 rag_sources 列,做到对原有数据结构最小侵入。
核心分块表 kb_chunks
CREATE TABLE IF NOT EXISTS kb_chunks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
kb_id INTEGER NOT NULL,
doc_id INTEGER NOT NULL,
chunk_index INTEGER NOT NULL, -- 块在文档内的顺序(0-based)
content TEXT NOT NULL, -- 块文本(600字以内)
embedding TEXT, -- JSON 数组 "[0.12, -0.34, ...]"(1024维)
token_count INTEGER DEFAULT 0, -- 近似字数
FOREIGN KEY (kb_id) REFERENCES knowledge_bases(id) ON DELETE CASCADE,
FOREIGN KEY (doc_id) REFERENCES kb_documents(id) ON DELETE CASCADE
);
-- FTS5 全文检索虚表(与 kb_chunks 同步写入)
CREATE VIRTUAL TABLE IF NOT EXISTS kb_chunks_fts USING fts5(
content,
chunk_id UNINDEXED,
kb_id UNINDEXED,
tokenize='unicode61 remove_diacritics 1' -- 支持中文
);
TEXT 类型存储 JSON 字符串(如 [0.12, -0.34, ...]),读取后在 Node.js 内存中 JSON.parse 后计算余弦相似度,无需专用向量数据库,1万块以内性能完全满足需求。
存储规模估算
| 知识库规模 | 分块数 | Embedding 存储 | SQLite 文件大小 |
|---|---|---|---|
| 10 个文档 | ~500 块 | ~5 MB | ~6 MB |
| 100 个文档 | ~5,000 块 | ~50 MB | ~60 MB |
| 500 个文档 | ~25,000 块 | ~250 MB | ~300 MB |
后端模块结构
知识库功能全部封装在独立的 KnowledgeModule 中,通过 exports: [KnowledgeService] 暴露检索接口供 ChatModule 调用。
server/src/modules/knowledge/
├── knowledge.module.ts # 模块注册,导入 ChatModule 依赖
├── knowledge.controller.ts # REST 路由(全部在 /api/knowledge 下)
├── knowledge.service.ts # 业务逻辑:CRUD、索引、检索、预览
├── chunker.service.ts # 文本解析 + 分块算法
└── embedding.service.ts # 调用硅基流动 Embedding API
@Module({
imports: [DatabaseModule, ConfigModule, HttpModule],
controllers: [KnowledgeController],
providers: [KnowledgeService, ChunkerService, EmbeddingService],
exports: [KnowledgeService], // 导出供 ChatModule 调用 retrieve()
})
export class KnowledgeModule {}
处理所有 /api/knowledge 路由,包含知识库 CRUD、文档上传、分块查看、预览接口。
核心业务逻辑:processDocument() 异步索引流程 + retrieve() 混合检索。
文件解析(TXT/MD/DOCX)+ 段落优先分块算法,600字/块,80字重叠。
封装硅基流动 BAAI/bge-m3 API,支持批量(32条/批)和单条查询 Embedding。
文本解析与分块算法
分块(Chunking)是 RAG 系统的基础。块太大检索精度低,块太小丢失上下文。本系统采用段落优先 + 固定窗口的混合策略,兼顾语义完整性与检索精度。
分块算法逻辑
将原始文本按 \n\n 分割为段落列表,过滤掉长度 < 20 字的噪声碎片。
单个段落长度 > 600 字时,以 600 字为步长、80 字为重叠进行强制切割,确保每块不超过限制。
段落 ≤ 600 字时,尝试将下一段追加到当前块。合并后 ≤ 600 字则继续追加;否则提交当前块,新块以上一块末尾 80 字开头(滑动窗口重叠)。
chunkText(text: string): string[] {
const paragraphs = text
.replace(/\r\n/g, '\n').replace(/\r/g, '\n')
.split('\n\n')
.map(p => p.trim())
.filter(p => p.length >= this.MIN_CHUNK_LENGTH); // 过滤 <20 字碎片
const chunks: string[] = [];
let current = '';
for (const para of paragraphs) {
if (para.length > this.CHUNK_SIZE) {
// 超长段落:强制固定窗口切割
if (current) { chunks.push(current); current = ''; }
let i = 0;
while (i < para.length) {
const slice = para.slice(i, i + this.CHUNK_SIZE).trim();
if (slice.length >= this.MIN_CHUNK_LENGTH) chunks.push(slice);
i += this.CHUNK_SIZE - this.OVERLAP;
}
} else {
const candidate = current ? `${current}\n\n${para}` : para;
if (candidate.length <= this.CHUNK_SIZE) {
current = candidate; // 继续追加
} else {
if (current) chunks.push(current);
// 新块以上一块末尾 80 字开头(滑动窗口重叠)
const overlap = current.slice(-this.OVERLAP);
current = overlap ? `${overlap}\n\n${para}` : para;
}
}
}
if (current.trim().length >= this.MIN_CHUNK_LENGTH) chunks.push(current.trim());
return chunks;
}
向量化服务(EmbeddingService)
向量化是将文本转换为高维数值向量的过程,语义相近的文本在向量空间中距离更近。本系统使用硅基流动提供的 BAAI/bge-m3 模型,专为中英文混合场景优化。
// 批量 Embedding(32条/批,避免超出 API 限制)
async embedTexts(texts: string[]): Promise<number[][]> {
const BATCH = 32;
const results: number[][] = [];
for (let i = 0; i < texts.length; i += BATCH) {
const batch = texts.slice(i, i + BATCH);
const res = await axios.post(
'https://api.siliconflow.cn/v1/embeddings',
{ model: 'BAAI/bge-m3', input: batch, encoding_format: 'float' },
{ headers: { Authorization: `Bearer ${this.apiKey}` } },
);
results.push(...res.data.data.map((d: any) => d.embedding));
}
return results;
}
// 纯 Node.js 余弦相似度,无第三方库
function cosineSimilarity(a: number[], b: number[]): number {
let dot = 0, normA = 0, normB = 0;
for (let i = 0; i < a.length; i++) {
dot += a[i] * b[i];
normA += a[i] * a[i];
normB += b[i] * b[i];
}
return dot / (Math.sqrt(normA) * Math.sqrt(normB));
}
// 性能:1万个 1024维向量 ≈ 20-50ms(纯CPU Node.js)
.env 中配置 SILICONFLOW_API_KEY=sk-xxx,否则上传文档时会报 Embedding API 错误。
混合检索策略(RRF)
单一检索方式各有缺陷:向量检索理解语义但精确词匹配弱;关键词检索精确但不懂同义词。RRF(Reciprocal Rank Fusion)融合两者,无需调参,效果超越任一单一方式。
| 检索方式 | 优势 | 劣势 | 本系统用途 |
|---|---|---|---|
| 向量语义检索 | 理解语义,"汽车"能匹配"车辆" | 精确词匹配弱,专有名词易漏 | 余弦相似度 Top-10 候选 |
| FTS5 关键词检索 | 精确匹配,BM25 相关性排序 | 无法理解同义词、跨语言 | BM25 Top-10 候选 |
| RRF 融合排序 | 互补两者优势,无需调参 | 需额外计算合并 | 最终 Top-5 输出 |
RRF 算法原理
score(d) = Σ 1 / (k + rank(d, list_i)) 其中 k=60(经验值)。文档在每个排名列表中的排名越靠前,贡献的分数越高;同时出现在两个列表中的文档分数会叠加,天然实现双路增强。
function rrfMerge(vectorList: any[], ftsList: any[], topK: number): any[] {
const K = 60;
const scoreMap = new Map<number, { item: any; score: number }>();
// 向量检索排名贡献
vectorList.forEach((item, rank) => {
const s = 1 / (K + rank + 1);
const cur = scoreMap.get(item.id);
scoreMap.set(item.id, cur ? { item, score: cur.score + s } : { item, score: s });
});
// FTS5 排名贡献(同一文档分数叠加)
ftsList.forEach((item, rank) => {
const s = 1 / (K + rank + 1);
const cur = scoreMap.get(item.id);
scoreMap.set(item.id, cur ? { item, score: cur.score + s } : { item, score: s });
});
return [...scoreMap.values()]
.sort((a, b) => b.score - a.score)
.slice(0, topK)
.map(({ item, score }) => ({ ...item, score }));
}
检索链路示意
│
├── 向量检索 → embed("如何申请退款?") → 余弦相似度 → Top-10 候选
│
├── FTS5 检索 → MATCH "如何 申请 退款" → BM25 排序 → Top-10 候选
│
└── RRF 融合 → 取并集 → 重排序 → Top-5 最终结果
与对话系统集成
RAG 集成遵循最小改动原则:仅修改 ChatController.chatCompletions,在 AI 流式输出前注入检索上下文,对 AiService 零改动。
@Post('completions')
async chatCompletions(@Body() body, @Req() req, @Res() res) {
const { message, kb_ids } = body;
// ① RAG 检索(仅当用户选择了知识库时才触发)
let ragSources: Array<{ filename: string }> = [];
if (Array.isArray(kb_ids) && kb_ids.length > 0 && message) {
const chunks = await this.knowledgeService.retrieve(message, kb_ids, 5);
if (chunks.length > 0) {
// ② 去重来源文件名
ragSources = [...new Map(
chunks.filter(c => c.filename).map(c => [c.filename, { filename: c.filename }])
).values()];
// ③ 注入 RAG System Prompt(AI 输出前插入参考资料)
const ragContext = chunks.map((c, i) => `【${i+1}】${c.content}`).join('\n\n');
historyMessages.unshift({
role: 'system',
content: `## 参考资料\n请基于以下资料回答用户问题。\n\n${ragContext}`,
});
}
}
// ④ 设置 SSE 响应头
res.setHeader('Content-Type', 'text/event-stream');
res.flushHeaders();
// ⑤ AI 输出前推送来源事件(前端气泡底部立即显示标签)
if (ragSources.length > 0) writeEvent('rag_sources', { sources: ragSources });
// ⑥ 调用 AI 流式输出,onDone 时持久化 rag_sources
await this.aiService.streamChat({ ...,
onDone: (fullText) => {
this.chatService.addMessage(conversationId, 'assistant', fullText, {
rag_sources: ragSources.length > 0 ? JSON.stringify(ragSources) : undefined,
});
},
});
}
RAG 来源标注与持久化
AI 回答底部的"知识库来源"标签,需要在页面刷新后仍然显示。由于 Zustand Store 是内存态,来源信息必须持久化到数据库。
"sources": [
{ "filename": "产品手册.docx" },
{ "filename": "退款政策.md" }
]
}
{!isUser && !isStreaming && message.rag_sources?.length > 0 && (
<div className="flex items-center flex-wrap gap-1.5 mt-2">
<span className="text-[11px] text-purple-500 font-medium">
<BookOpen size={11} /> 知识库来源:
</span>
{message.rag_sources.map((src, i) => (
<span key={i}
className="inline-flex items-center gap-1 px-2 py-0.5 rounded-full
bg-purple-50 border border-purple-200 text-[11px] text-purple-700"
>
<FileText size={10} />
<span className="truncate max-w-[180px]">{src.filename}</span>
</span>
))}
</div>
)}
文档预览功能
预览依赖 kb_documents.parsed_content 字段。对于没有该字段的历史文档,系统会从 kb_chunks 拼接重建,并智能去除块间的重叠部分。
| 文件类型 | 渲染方式 | 说明 |
|---|---|---|
.md | ReactMarkdown + remark-gfm | GFM 富文本渲染(表格/代码块/链接) |
.txt | <pre className="whitespace-pre-wrap font-mono"> | 等宽保留格式 |
.docx | <pre className="whitespace-pre-wrap"> | mammoth 提取的纯文本 |
getDocumentPreview(docId: number, kbId: number) {
const doc = this.db.get(
'SELECT id, filename, file_type, parsed_content FROM kb_documents WHERE id=? AND kb_id=?',
[docId, kbId],
);
if (!doc) return null;
// 老文档 parsed_content 为 NULL → 从 kb_chunks 拼接重建
if (!doc.parsed_content) {
const chunks = this.db.all(
'SELECT content FROM kb_chunks WHERE doc_id=? ORDER BY chunk_index ASC',
[docId],
);
if (chunks.length > 0) {
let assembled = chunks[0].content;
for (let i = 1; i < chunks.length; i++) {
const curr = chunks[i].content;
const suffix = assembled.slice(-80);
// 检测相邻块重叠,去重拼接
if (curr.startsWith(suffix) && suffix.trim().length > 0) {
assembled += '\n\n' + curr.slice(suffix.length);
} else {
assembled += '\n\n' + curr;
}
}
doc.parsed_content = assembled;
}
}
return doc;
}
前端实现
前端由两个核心模块组成:知识库管理页(KnowledgePage)和嵌入在对话输入框中的知识库选择器(KbPicker)。状态通过 Zustand Store 管理。
KnowledgePage 页面结构
KnowledgePage
├── 左侧面板:知识库列表
│ ├── 知识库卡片(名称/文档数/分块数)
│ ├── 公共/私有图标(Globe/Lock)
│ └── 删除按钮 → 自定义确认弹窗
│
├── 右侧面板:文档列表
│ ├── 文档卡片(文件名/大小/状态徽章/分块数)
│ ├── 状态轮询(pending → indexing → ready)
│ ├── 分块详情 → 右侧抽屉(chunk_index/字数/内容)
│ ├── 预览按钮 → 全屏预览 Modal(md渲染/txt等宽)
│ └── 删除按钮 → 自定义确认弹窗
│
├── 新建知识库弹窗(名称/描述/公共权限开关)
└── 文件上传(支持 txt/md/docx,拖拽或点击)
KbPicker 知识库选择器
KbPicker 嵌入在 ChatInput 工具栏中,点击知识库图标弹出面板,选择后发消息时自动携带 kb_ids。
Zustand Store 扩展
// Message 类型新增 rag_sources 字段
export interface Message {
id: string;
role: 'user' | 'assistant';
content: string;
rag_sources?: Array<{ filename: string }>; // ← 新增
}
// SSE rag_sources 事件触发的 Action
setMessageSources: (convId, msgId, sources) =>
set(s => ({
messages: {
...s.messages,
[convId]: s.messages[convId].map(m =>
m.id === msgId ? { ...m, rag_sources: sources } : m
),
},
})),
完整 API 参考
知识库管理
| 方法 | 路径 | 说明 | 权限 |
|---|---|---|---|
| GET | /api/knowledge/bases | 获取可见知识库列表(公共 + 自己的) | 登录用户 |
| POST | /api/knowledge/bases | 创建知识库 | 登录用户 |
| PATCH | /api/knowledge/bases/:kbId | 更新知识库名称/描述 | 所有者/管理员 |
| DELETE | /api/knowledge/bases/:kbId | 删除知识库(级联删除文档和分块) | 所有者/管理员 |
文档管理
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/knowledge/bases/:kbId/documents | 获取知识库文档列表 |
| POST | /api/knowledge/bases/:kbId/documents/upload | 上传文档(multipart/form-data) |
| DELETE | /api/knowledge/bases/:kbId/documents/:docId | 删除文档(级联删除分块) |
分块与预览
| 方法 | 路径 | 说明 |
|---|---|---|
| GET | /api/knowledge/bases/:kbId/documents/:docId/chunks | 获取文档分块列表(不含向量数据) |
| GET | /api/knowledge/bases/:kbId/documents/:docId/preview | 获取文档预览内容 |
| GET | /api/knowledge/status/:kbId | 获取知识库索引进度统计 |
对话集成(新增参数)
{
"message": "如何申请退款?",
"kb_ids": [1, 2], // 要检索的知识库 ID 列表(为空则不触发 RAG)
"conversation_id": "...",
// ...其他原有参数不变
}
性能与规模参考
本系统面向中小型知识库(< 1万块)设计,在该规模下无需任何专用向量数据库,纯 SQLite + Node.js 内存计算即可满足需求。
检索延迟参考
| 知识库规模 | 向量检索 | FTS5 检索 | 总延迟(含 Embedding API) |
|---|---|---|---|
| 500 块 | < 5ms | < 2ms | ~300ms(API RTT 为主) |
| 5,000 块 | < 20ms | < 5ms | ~350ms |
| 20,000 块 | ~80ms | < 10ms | ~430ms |
| 100,000 块 | ~400ms | < 20ms | ~700ms(建议分库优化) |
索引成本(BAAI/bge-m3)
| 知识库规模 | 分块数 | 索引成本(约) | 检索成本/次 |
|---|---|---|---|
| 10 个文档 | 500 块 | ¥0.05 | < ¥0.001 |
| 100 个文档 | 5,000 块 | ¥0.50 | < ¥0.001 |
| 1,000 个文档 | 50,000 块 | ¥5.00 | < ¥0.001 |
扩展路径
Node.js 内存计算余弦相似度 + SQLite FTS5,延迟完全可接受,零额外基础设施成本。
按 kb_id 分片 + 分批检索,仅查询相关知识库的向量,延迟可控在 500ms 以内。
迁移到 LanceDB(嵌入式向量数据库,ANN 近似检索)或 sqlite-vec 扩展(纯 SQL 向量检索),无需更换整体架构。