跳到主要内容

使用 Milvus 和 SiliconFlow 构建 RAG

Open In Colab GitHub Repository

SiliconFlow 致力于构建可扩展、标准化和高性能的 AI 基础设施平台。 SiliconCloud 是 SiliconFlow 的旗舰产品之一,被描述为模型即服务(MaaS)平台。它提供了一个全面的环境来部署各种 AI 模型,包括大型语言模型(LLM)和嵌入模型。SiliconCloud 聚合了众多开源模型,使用户能够轻松访问和利用这些资源,而无需进行大量的基础设施设置。

在本教程中,我们将向您展示如何使用 Milvus 和 SiliconFlow 构建 RAG(检索增强生成)管道。

准备工作

依赖项和环境

$ pip install --upgrade pymilvus openai requests tqdm

如果您使用的是 Google Colab,为了启用刚安装的依赖项,您可能需要重启运行时(点击屏幕顶部的"Runtime"菜单,然后从下拉菜单中选择"Restart session")。

SiliconFlow 启用了 OpenAI 风格的 API。您可以登录其官方网站并准备 api key SILICON_FLOW_API_KEY 作为环境变量。

import os

os.environ["SILICON_FLOW_API_KEY"] = "***********"

准备数据

我们使用 Milvus Documentation 2.4.x 的 FAQ 页面作为我们 RAG 中的私有知识,这是一个简单 RAG 管道的良好数据源。

下载 zip 文件并将文档提取到文件夹 milvus_docs

$ wget https://github.com/milvus-io/milvus-docs/releases/download/v2.4.6-preview/milvus_docs_2.4.x_en.zip
$ unzip -q milvus_docs_2.4.x_en.zip -d milvus_docs

我们从文件夹 milvus_docs/en/faq 加载所有 markdown 文件。对于每个文档,我们只是简单地使用 "# " 来分离文件中的内容,这可以大致分离 markdown 文件每个主要部分的内容。

from glob import glob

text_lines = []

for file_path in glob("milvus_docs/en/faq/*.md", recursive=True):
with open(file_path, "r") as file:
file_text = file.read()

text_lines += file_text.split("# ")

准备嵌入模型

我们初始化一个客户端来准备嵌入模型。SiliconFlow 启用了 OpenAI 风格的 API,您可以使用相同的 API 进行少量调整来调用嵌入模型和 LLM。

from openai import OpenAI

siliconflow_client = OpenAI(
api_key=os.environ["SILICON_FLOW_API_KEY"], base_url="https://api.siliconflow.cn/v1"
)

定义一个函数,使用客户端生成文本嵌入。我们使用 BAAI/bge-large-en-v1.5 模型作为示例。

def emb_text(text):
return (
siliconflow_client.embeddings.create(input=text, model="BAAI/bge-large-en-v1.5")
.data[0]
.embedding
)

生成一个测试嵌入并打印其维度和前几个元素。

test_embedding = emb_text("This is a test")
embedding_dim = len(test_embedding)
print(embedding_dim)
print(test_embedding[:10])

输出:1024 1024 维的嵌入向量,显示前 10 个元素的数值。

将数据加载到 Milvus

创建 Collection

from pymilvus import MilvusClient

milvus_client = MilvusClient(uri="./milvus_demo.db")

collection_name = "my_rag_collection"

关于 MilvusClient 的参数:

  • uri 设置为本地文件,例如 ./milvus.db,是最方便的方法,因为它会自动利用 Milvus Lite 将所有数据存储在此文件中。
  • 如果您有大规模数据,可以在 docker 或 kubernetes 上设置性能更高的 Milvus 服务器。在此设置中,请使用服务器 uri,例如 http://localhost:19530,作为您的 uri
  • 如果您想使用 Zilliz Cloud,Milvus 的完全托管云服务,请调整 uritoken,它们对应于 Zilliz Cloud 中的 Public Endpoint 和 Api key

检查 collection 是否已存在,如果存在则删除它。

if milvus_client.has_collection(collection_name):
milvus_client.drop_collection(collection_name)

使用指定参数创建新的 collection。

如果我们不指定任何字段信息,Milvus 将自动创建一个默认的 id 字段作为主键,以及一个 vector 字段来存储向量数据。保留的 JSON 字段用于存储非模式定义的字段及其值。

milvus_client.create_collection(
collection_name=collection_name,
dimension=embedding_dim,
metric_type="IP", # 内积距离
consistency_level="Strong", # 支持的值为 (`"Strong"`, `"Session"`, `"Bounded"`, `"Eventually"`)。详见 https://milvus.io/docs/consistency.md#Consistency-Level
)

插入数据

遍历文本行,创建嵌入,然后将数据插入到 Milvus 中。

这里有一个新字段 text,它是 collection 模式中的非定义字段。它将自动添加到保留的 JSON 动态字段中,在高级别上可以被视为普通字段。

from tqdm import tqdm

data = []

for i, line in enumerate(tqdm(text_lines, desc="Creating embeddings")):
data.append({"id": i, "vector": emb_text(line), "text": line})

milvus_client.insert(collection_name=collection_name, data=data)

Creating embeddings: 100%|██████████| 72/72 [完成]

输出:成功插入了 72 条记录,返回了所有记录的 ID 列表和操作成本信息。

构建 RAG

为查询检索数据

让我们指定一个关于 Milvus 的常见问题。

question = "How is data stored in milvus?"

在 collection 中搜索问题并检索语义上最相关的前 3 个匹配项。

search_res = milvus_client.search(
collection_name=collection_name,
data=[emb_text(question)], # 将问题转换为嵌入向量
limit=3, # 返回前 3 个结果
search_params={"metric_type": "IP", "params": {}}, # 内积距离
output_fields=["text"], # 返回文本字段
)

让我们看看查询的搜索结果

retrieved_lines_with_distances = [
(res["entity"]["text"], res["distance"]) for res in search_res[0]
]
retrieved_lines_with_distances

输出:返回了 3 个最相关的搜索结果,每个结果包含文本内容和相似度分数。

使用 LLM 生成 RAG 响应

将检索到的文档转换为字符串格式。

context = "\n".join(
[line_with_distance[0] for line_with_distance in retrieved_lines_with_distances]
)

为语言模型定义系统和用户提示符。此提示符是根据从 Milvus 检索到的文档组装的。

SYSTEM_PROMPT = """
Human: You are an AI assistant. You are able to find answers to the questions from the contextual passage snippets provided.
"""

USER_PROMPT = f"""
Use the following pieces of information enclosed in <context> tags to provide an answer to the question enclosed in <question> tags.
<context>
{context}
</context>
<question>
{question}
</question>
"""

使用 SiliconFlow 提供的 LLM 基于提示符生成响应。

response = siliconflow_client.chat.completions.create(
model="deepseek-ai/DeepSeek-V2.5",
messages=[
{"role": "system", "content": SYSTEM_PROMPT},
{"role": "user", "content": USER_PROMPT},
],
)

print(response.choices[0].message.content)

Based on the provided context, Milvus stores data in two main ways:

  1. Inserted Data: This includes vector data, scalar data, and collection-specific schema. This data is stored in persistent storage as incremental logs. Milvus supports multiple object storage backends for this purpose, including:

    • MinIO
    • AWS S3
    • Google Cloud Storage (GCS)
    • Azure Blob Storage
    • Alibaba Cloud OSS
    • Tencent Cloud Object Storage (COS)
  2. Metadata: This is generated within Milvus itself. Each Milvus module has its own metadata, which is stored in etcd.

When data is inserted, it's first loaded to a message queue, and then Milvus' data node writes this data from the message queue to persistent storage as incremental logs. The actual flushing to disk can be controlled - if flush() is called, all data in the message queue is immediately written to persistent storage.

太好了!我们已经使用 Milvus 和 SiliconFlow 成功构建了一个 RAG 管道。

快速部署

要了解如何使用此教程启动在线演示,请参阅示例应用程序