跳到主要内容

最佳实践与示例

概述

本文档提供了一系列实际应用场景的最佳实践和示例代码,帮助您更好地使用 Langfuse Python SDK 进行 LLM 应用的可观测性数据采集。

常见应用场景

1. 问答系统

场景说明

实现一个基于文档的问答系统,包含文档检索和 LLM 回答生成两个主要步骤。

from langfuse.decorators import observe
from llama_index import VectorStoreIndex, Document
from langchain.chat_models import ChatOpenAI
from langchain.prompts import ChatPromptTemplate

@observe()
def create_index(documents):
"""创建文档索引"""
index = VectorStoreIndex.from_documents(documents)
return index

@observe()
def retrieve_context(index, query):
"""检索相关文档"""
retriever = index.as_retriever(search_kwargs={"k": 3})
docs = retriever.retrieve(query)
return docs

@observe()
def generate_answer(query: str, context: str):
"""生成回答"""
prompt = ChatPromptTemplate.from_template("""
基于以下上下文回答问题。如果上下文中没有相关信息,请说明无法回答。

上下文:{context}

问题:{query}
""")

chain = prompt | ChatOpenAI()
response = chain.invoke({
"context": context,
"query": query
})
return response

@observe()
def qa_pipeline(query: str, documents: list):
"""问答流程"""
# 创建索引
index = create_index(documents)

# 检索上下文
context = retrieve_context(index, query)

# 生成回答
answer = generate_answer(query, context)

return answer

2. 对话系统

场景说明

实现一个多轮对话系统,包含状态管理和上下文维护。

from langfuse.decorators import observe, langfuse_context
from dataclasses import dataclass
from typing import List, Dict

@dataclass
class Message:
role: str
content: str

@dataclass
class DialogueState:
messages: List[Message]
metadata: Dict

@observe()
class DialogueManager:
def __init__(self):
self.state = DialogueState(
messages=[],
metadata={"turn": 0}
)

@observe()
def add_message(self, role: str, content: str):
"""添加新消息"""
self.state.messages.append(Message(role=role, content=content))
self.state.metadata["turn"] += 1

# 记录对话状态
langfuse_context.update_current_observation(
metadata={"turn": self.state.metadata["turn"]}
)

@observe()
def generate_response(self) -> str:
"""生成回复"""
messages = [
{"role": msg.role, "content": msg.content}
for msg in self.state.messages
]

response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages
)

content = response.choices[0].message.content
self.add_message("assistant", content)
return content

@observe()
def chat_demo():
"""对话系统示例"""
dialogue = DialogueManager()

# 用户输入
dialogue.add_message("user", "你好,我想了解一下人工智能。")
response1 = dialogue.generate_response()

# 追问
dialogue.add_message("user", "那深度学习是什么?")
response2 = dialogue.generate_response()

return [response1, response2]

3. 内容生成系统

场景说明

实现一个多阶段的内容生成系统,包括大纲生成和内容扩写。

from langfuse.decorators import observe
from typing import List, Dict
import json

@observe()
def generate_outline(topic: str) -> List[str]:
"""生成文章大纲"""
prompt = f"为主题「{topic}」生成一个详细的文章大纲,包含主要章节和子章节。"

response = openai.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个专业的文章策划师"},
{"role": "user", "content": prompt}
],
response_format={"type": "json_object"}
)

outline = json.loads(response.choices[0].message.content)
return outline["sections"]

@observe()
def expand_section(section: str, context: str) -> str:
"""扩写章节内容"""
prompt = f"""
基于以下上下文,详细展开这个章节的内容:

主题背景:{context}
当前章节:{section}
"""

response = openai.chat.completions.create(
model="gpt-4",
messages=[
{"role": "system", "content": "你是一个专业的文章写作专家"},
{"role": "user", "content": prompt}
]
)

return response.choices[0].message.content

@observe()
def generate_article(topic: str) -> Dict[str, str]:
"""生成完整文章"""
# 生成大纲
outline = generate_outline(topic)

# 扩写每个章节
content = {}
for section in outline:
content[section] = expand_section(section, topic)

return content

框架集成示例

FastAPI 集成

FastAPI 集成

展示如何在 FastAPI 应用中集成 Langfuse SDK:

from fastapi import FastAPI, BackgroundTasks
from langfuse.decorators import observe, langfuse_context
from typing import Dict

app = FastAPI()

@observe()
async def process_document(doc_id: str) -> Dict:
"""异步处理文档"""
# 模拟文档处理
result = {"status": "completed", "doc_id": doc_id}
return result

@app.post("/process")
@observe()
async def process_endpoint(
doc_id: str,
background_tasks: BackgroundTasks
):
"""处理文档的 API 端点"""
# 启动异步任务
background_tasks.add_task(process_document, doc_id)

# 更新观测数据
langfuse_context.update_current_observation(
metadata={"doc_id": doc_id}
)

return {"status": "processing", "doc_id": doc_id}

@app.on_event("shutdown")
async def shutdown_event():
"""确保数据发送完成"""
langfuse_context.flush()

Celery 集成

Celery 集成

展示如何在 Celery 任务中集成 Langfuse SDK:

from celery import Celery
from langfuse.decorators import observe, langfuse_context

app = Celery('tasks')

@app.task
@observe()
def process_document(doc_id: str):
"""处理文档的后台任务"""
try:
# 处理逻辑
result = {"status": "success"}

# 更新观测数据
langfuse_context.update_current_observation(
metadata={"doc_id": doc_id, **result}
)

return result
finally:
# 确保数据发送完成
langfuse_context.flush()

性能优化

性能优化建议

在处理大量数据时,可以使用以下技巧优化性能:

from langfuse.decorators import observe
from concurrent.futures import ThreadPoolExecutor
from typing import List

@observe()
def process_batch(items: List[str]):
"""批量处理示例"""
trace_id = langfuse_context.get_current_trace_id()

def process_item(item: str):
return process_with_context(
item,
langfuse_parent_trace_id=trace_id
)

# 使用线程池并行处理
with ThreadPoolExecutor(max_workers=3) as executor:
results = list(executor.map(process_item, items))

return results

@observe()
def process_with_context(item: str, **kwargs):
"""处理单个项目"""
result = f"processed: {item}"
return result

错误处理

错误处理最佳实践

展示如何正确处理和记录错误:

from langfuse.decorators import observe, langfuse_context
from typing import Optional

class ProcessingError(Exception):
"""自定义处理错误"""
pass

@observe()
def process_with_error_handling(
data: dict,
retry_count: int = 3
) -> Optional[dict]:
"""带错误处理的处理函数"""
for attempt in range(retry_count):
try:
# 处理逻辑
result = process_data(data)

# 记录成功
langfuse_context.update_current_observation(
level="SUCCESS",
metadata={"attempt": attempt + 1}
)

return result

except ProcessingError as e:
# 记录重试
langfuse_context.update_current_observation(
level="WARNING",
metadata={
"attempt": attempt + 1,
"error": str(e)
}
)

if attempt == retry_count - 1:
# 最后一次重试失败
langfuse_context.update_current_observation(
level="ERROR",
status_message="重试次数已用完"
)
raise

测试集成

测试建议

展示如何在测试中集成 Langfuse SDK:

import pytest
from langfuse.decorators import observe, langfuse_context
from unittest.mock import patch

@observe()
def function_under_test(x: int) -> int:
"""被测试的函数"""
return x * 2

def test_observed_function():
"""测试带观测的函数"""
with patch('langfuse.decorators.langfuse_context') as mock_context:
# 执行函数
result = function_under_test(5)

# 验证结果
assert result == 10

# 验证观测数据
mock_context.update_current_observation.assert_called_once()

@pytest.fixture
def setup_langfuse():
"""测试环境设置"""
# 配置测试环境
langfuse_context.configure(
secret_key="test-key",
public_key="test-key",
debug=True
)
yield
# 清理
langfuse_context.flush()
注意事项
  1. 合理使用采样率控制数据量
  2. 在关键节点添加详细的元数据
  3. 正确处理异常情况
  4. 及时清理资源
  5. 定期检查数据质量