装饰器使用指南
功能概述
Langfuse Python SDK 提供了装饰器 API,这是一种简单直观的方式来采集 LLM 应用的可观测性数据。
通过使用 @observe()
装饰器,您可以轻松地跟踪应用中的各种操作。
基本用法
简单函数装饰
快速上手
下面是一个基本的装饰器使用示例:
from langfuse.decorators import observe
@observe()
def process_query(query: str):
return f"处理查询: {query}"
@observe()
def main():
result = process_query("测试查询")
return result
main()
自动记录
装饰器会自动记录:
- 函数调用的上下文关系
- 执行时间和持续时间
- 函数名称和参数信息
- 返回值数据
自定义观测属性
自定义属性
您可以通过 langfuse_context
来自定义观测属性:
from langfuse.decorators import observe, langfuse_context
@observe()
def process_document(doc_id: str):
# 更新当前观测的属性
langfuse_context.update_current_observation(
name="文档处理",
metadata={"doc_id": doc_id},
level="INFO"
)
# 处理逻辑
return "处理完成"
框架集成
框架支持
装饰器 API 提供了与主流 LLM 框架的无缝集成支持,包括 OpenAI、LangChain 和 LlamaIndex。
OpenAI 集成
OpenAI 示例
使用装饰器跟踪 OpenAI API 调用:
from langfuse.decorators import observe
from langfuse.openai import openai
@observe()
def generate_story():
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个故事生成器"},
{"role": "user", "content": "写一个简短的科幻故事"}
]
)
return response.choices[0].message.content
LangChain 集成
LangChain 示例
使用装饰器跟踪 LangChain 执行过程:
from langfuse.decorators import observe, langfuse_context
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
@observe()
def process_with_langchain(query: str):
# 获取 LangChain 回调处理器
handler = langfuse_context.get_current_langchain_handler()
# 创建并执行 Chain
prompt = ChatPromptTemplate.from_template("回答问题: {query}")
model = ChatOpenAI()
chain = prompt | model
# 使用回调处理器执行
result = chain.invoke(
{"query": query},
config={"callbacks": [handler]}
)
return result
LlamaIndex 集成
LlamaIndex 示例
使用装饰器跟踪 LlamaIndex 检索和生成过程:
from langfuse.decorators import observe, langfuse_context
from llama_index.core import Settings, VectorStoreIndex
from llama_index.core.callbacks import CallbackManager
@observe()
def search_documents(query: str):
# 获取 LlamaIndex 回调处理器
handler = langfuse_context.get_current_llama_index_handler()
Settings.callback_manager = CallbackManager([handler])
# 执行搜索
index = VectorStoreIndex.from_documents(documents)
response = index.as_query_engine().query(query)
return response
高级功能
自定义输入输出处理
敏感数据处理
处理敏感数据时,可以禁用自动捕获并手动控制记录的内容:
@observe(capture_input=False, capture_output=False)
def process_sensitive_data(secret_data: str):
# 处理敏感数据
result = "处理结果"
# 手动设置过滤后的输入输出
langfuse_context.update_current_observation(
input="[已过滤的输入]",
output="[已过滤的输出]"
)
return result
并发处理
并发注意事项
在使用线程池时,需要特别注意上下文的传递:
from concurrent.futures import ThreadPoolExecutor
from langfuse.decorators import langfuse_context, observe
@observe()
def process_batch(items):
trace_id = langfuse_context.get_current_trace_id()
observation_id = langfuse_context.get_current_observation_id()
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(
process_item,
item,
langfuse_parent_trace_id=trace_id,
langfuse_parent_observation_id=observation_id
)
for item in items
]
return [f.result() for f in futures]
@observe()
def process_item(item, **kwargs):
return f"处理项目: {item}"
最佳实践
最佳实践建议
-
合理使用观测类型
- 使用
Event
记录关键节点 - 使用
Span
记录耗时操作 - 使用
Generation
记录 AI 模型调用
- 使用
-
异常处理
@observe()
def process_with_error_handling(data):
try:
result = process_data(data)
return result
except Exception as e:
langfuse_context.update_current_observation(
level="ERROR",
status_message=str(e)
)
raise -
版本管理
@observe()
def generate_content(prompt: str, version: str = "v1"):
langfuse_context.update_current_observation(
version=version,
metadata={"prompt_template": "标准模板"}
)
return "生成的内容"
短期环境处理
在 AWS Lambda 等短期环 境中,确保在函数结束前调用 flush()
:
from langfuse.decorators import observe, langfuse_context
@observe()
def lambda_handler(event, context):
result = process_event(event)
langfuse_context.flush() # 确保数据发送完成
return result
故障排除
故障排除指南
-
启用调试模式
import os
os.environ["LANGFUSE_DEBUG"] = "True" -
验证配置
from langfuse.decorators import langfuse_context
assert langfuse_context.auth_check() -
常见问题解决
- 如果观测数据未显示,检查是否正确调用了
flush()
- 如果上下文关系不正确,检查并发处理中的父子关系设置
- 如果数据不完整,检查网络连接和超时设置
- 如果观测数据未显示,检查是否正确调用了