将 Milvus 与 DSPy 集成
什么是 DSPy
DSPy 由斯坦福 NLP 小组引入,是一个开创性的编程框架,旨在优化语言模型中的提示和权重,在大语言模型 (LLM) 集成到管道的多个阶段的场景中特别有价值。与依赖手动制作和调整的传统提示工程技术不同,DSPy 采用基于学习的方法。通过吸收问答示例,DSPy 动态生成针对特定任务优化的提示。这种创新方法能够无缝重新组装整个管道,消除了持续手动调整提示的需要。DSPy 的 Pythonic 语法提供了各种可组合和声明性模块,简化了对 LLM 的指导。
使用 DSPy 的好处
- 编程方法:DSPy 通过将管道抽象为文本转换图而不仅仅是提示 LLM,为 LM 管道开发提供了系统的编程方法。其声明性模块支持结构化设计和优化,取代了传统提示模板的试错方法。
- 性能改进:DSPy 展示了相对于现有方法的显著性能提升。通过案例研究,它超越了标准提示和专家创建的演示,展现了其多功能性和有效性,即使在编译到更小的 LM 模型时也是如此。
- 模块化抽象:DSPy 有效地抽象了 LM 管道开发的复杂方面,如分解、微调和模型选择。使用 DSPy,一个简洁的程序可以无缝转换为各种模型的指令,如 GPT-4、Llama2-13b 或 T5-base,简化开发并提高性能。
模块
有许多组件有助于构建 LLM 管道。在这里,我们将描述一些关键组件,以提供对 DSPy 如何运作的高级理解。
Signature:DSPy 中的 Signature 作为声明性规范,概述了模块的输入/输出行为,指导语言模型执行任务。 Module:DSPy 模块作为利用语言模型 (LM) 的程序的基本组件。它们抽象了各种提示技术,如思维链或 ReAct,并且适应处理任何 DSPy Signature。具有可学习参数和处理输入并产生输出的能力,这些模块可以组合形成更大的程序,从 PyTorch 中的 NN 模块汲取灵感,但针对 LM 应用进行了定制。 Optimizer:DSPy 中的优化器微调 DSPy 程序的参数,如提示和 LLM 权重,以最大化指定的指标(如准确性),提高程序效率。
为什么在 DSPy 中使用 Milvus
DSPy 是一个强大的编程框架,可以增强 RAG 应用程序。这种应用程序需要检索有用信息来提高答案质量,这需要向量数据库。Milvus 是一个知名的开源向量数据库,可以提高性能和可扩展性。通过 MilvusRM(DSPy 中的检索器模块),集成 Milvus 变得无缝。现在,开发者可以使用 DSPy 轻松定义和优化 RAG 程序,利用 Milvus 强大的向量搜索能力。这种协作使 RAG 应用程序更高效和可扩展,结合了 DSPy 的编程能力和 Milvus 的搜索功能。
示例
现在,让我们通过一个快速示例来演示如何在 DSPy 中利用 Milvus 来优化 RAG 应用程序。
先决条件
在构建 RAG 应用程序之前,请安装 DSPy 和 PyMilvus。
$ pip install "dspy-ai[milvus]"
$ pip install -U pymilvus
如果您使用 Google Colab,为了启用刚安装的依赖项,您可能需要重启运行时(点击屏幕顶部的"Runtime"菜单,并从下拉菜单中选择"Restart session")。
加载数据集
在此示例中,我们使用 HotPotQA(复杂问答对的集合)作为我们的训练数据集。我们可以通过 HotPotQA 类加载它们。
from dspy.datasets import HotPotQA
# 加载数据集
dataset = HotPotQA(
train_seed=1, train_size=20, eval_seed=2023, dev_size=50, test_size=0
)
# 告诉 DSPy 'question' Field 是输入。任何其他 Field 都是标签和/或元数据。
trainset = [x.with_inputs("question") for x in dataset.train]
devset = [x.with_inputs("question") for x in dataset.dev]
将数据摄取到 Milvus 向量数据库
将上下文信息摄取到 Milvus Collection 中以进行向量检索。此 Collection 应该有一个 embedding
Field 和一个 text
Field。在这种情况下,我们使用 OpenAI 的 text-embedding-3-small
模型作为默认查询嵌入函数。
import requests
import os
os.environ["OPENAI_API_KEY"] = "<YOUR_OPENAI_API_KEY>"
MILVUS_URI = "example.db"
MILVUS_TOKEN = ""
from pymilvus import MilvusClient, DataType, Collection
from dspy.retrieve.milvus_rm import openai_embedding_function
client = MilvusClient(uri=MILVUS_URI, token=MILVUS_TOKEN)
if "dspy_example" not in client.list_collections():
client.create_collection(
collection_name="dspy_example",
overwrite=True,
dimension=1536,
primary_field_name="id",
vector_field_name="embedding",
id_type="int",
metric_type="IP",
max_length=65535,
enable_dynamic=True,
)
text = requests.get(
"https://raw.githubusercontent.com/wxywb/dspy_dataset_sample/master/sample_data.txt"
).text
for idx, passage in enumerate(text.split("\n")):
if len(passage) == 0:
continue
client.insert(
collection_name="dspy_example",
data=[
{
"id": idx,
"embedding": openai_embedding_function(passage)[0],
"text": passage,
}
],
)
定义 MilvusRM
现在,您需要定义 MilvusRM。
from dspy.retrieve.milvus_rm import MilvusRM
import dspy
retriever_model = MilvusRM(
collection_name="dspy_example",
uri=MILVUS_URI,
token=MILVUS_TOKEN, # 如果 Milvus 连接不需要 token,请忽略此项
embedding_function=openai_embedding_function,
)
turbo = dspy.OpenAI(model="gpt-3.5-turbo")
dspy.settings.configure(lm=turbo)
构建签名
现在我们已经加载了数据,让我们开始为管道的子任务定义签名。我们可以识别简单的输入 question
和输出 answer
,但由于我们正在构建 RAG 管道,我们将从 Milvus 检索上下文信息。所以让我们将签名定义为 context, question --> answer
。
class GenerateAnswer(dspy.Signature):
"""用简短的事实性答案回答问题。"""
context = dspy.InputField(desc="may contain relevant facts")
question = dspy.InputField()
answer = dspy.OutputField(desc="often between 1 and 5 words")
我们为 context
和 answer
Field 包含简短描述,以定义关于模型将接收什么和应该生成什么的更清晰指导原则。
构建管道
现在,让我们定义 RAG 管道。
class RAG(dspy.Module):
def __init__(self, rm):
super().__init__()
self.retrieve = rm
# 此签名指示对 COT 模块施加的任务
self.generate_answer = dspy.ChainOfThought(GenerateAnswer)
def forward(self, question):
# 使用 milvus_rm 为问题检索上下文
context = self.retrieve(question).passages
# COT 模块接受 "context, query" 并输出 "answer"
prediction = self.generate_answer(context=context, question=question)
return dspy.Prediction(
context=[item.long_text for item in context], answer=prediction.answer
)
执行管道并获得结果
现在,我们已经构建了这个 RAG 管道。让我们试试它并获得结果。
rag = RAG(retriever_model)
print(rag("who write At My Window").answer)
Townes Van Zandt
我们可以评估数据集上的定量结果。
from dspy.evaluate.evaluate import Evaluate
from dspy.datasets import HotPotQA
evaluate_on_hotpotqa = Evaluate(
devset=devset, num_threads=1, display_progress=False, display_table=5
)
metric = dspy.evaluate.answer_exact_match
score = evaluate_on_hotpotqa(rag, metric=metric)
print("rag:", score)
优化管道
定义此程序后,下一步是编译。此过程更新每个模块内的参数以提高性能。编译过程依赖于三个关键因素:
- 训练集:我们将使用来自训练数据集的 20 个问答示例进行此演示。
- 验证指标:我们将建立一个简单的
validate_context_and_answer
指标。该指标验证预测答案的准确性,并确保检索到的上下文包含答案。 - 特定优化器(Teleprompter):DSPy 的编译器包含多个专为有效优化您的程序而设计的 teleprompter。
from dspy.teleprompt import BootstrapFewShot
# 验证逻辑:检查预测答案是否正确。# 同时检查检索到的上下文确实包含该答案。
def validate_context_and_answer(example, pred, trace=None):
answer_EM = dspy.evaluate.answer_exact_match(example, pred)
answer_PM = dspy.evaluate.answer_passage_match(example, pred)
return answer_EM and answer_PM
# 设置基本 teleprompter,它将编译我们的 RAG 程序。
teleprompter = BootstrapFewShot(metric=validate_context_and_answer)
# 编译!
compiled_rag = teleprompter.compile(rag, trainset=trainset)
# 现在 compiled_rag 已优化并准备回答您的新问题!
# 现在,让我们评估编译后的 RAG 程序。
score = evaluate_on_hotpotqa(compiled_rag, metric=metric)
print(score)
print("compile_rag:", score)
Ragas 得分从之前的 50.0 增加到 52.0,表明答案质量有所提升。
总结
DSPy 通过其可编程接口标志着语言模型交互的飞跃,它促进了模型提示和权重的算法和自动化优化。通过利用 DSPy 实现 RAG,适应不同的语言模型或数据集变得轻而易举,大大减少了繁琐手动干预的需要。