跳到主要内容

装饰器使用指南

功能概述

Langfuse Python SDK 提供了装饰器 API,这是一种简单直观的方式来采集 LLM 应用的可观测性数据。

通过使用 @observe() 装饰器,您可以轻松地跟踪应用中的各种操作。

基本用法

简单函数装饰

快速上手

下面是一个基本的装饰器使用示例:

from langfuse.decorators import observe

@observe()
def process_query(query: str):
return f"处理查询: {query}"

@observe()
def main():
result = process_query("测试查询")
return result

main()
自动记录

装饰器会自动记录:

  • 函数调用的上下文关系
  • 执行时间和持续时间
  • 函数名称和参数信息
  • 返回值数据

自定义观测属性

自定义属性

您可以通过 langfuse_context 来自定义观测属性:

from langfuse.decorators import observe, langfuse_context

@observe()
def process_document(doc_id: str):
# 更新当前观测的属性
langfuse_context.update_current_observation(
name="文档处理",
metadata={"doc_id": doc_id},
level="INFO"
)
# 处理逻辑
return "处理完成"

框架集成

框架支持

装饰器 API 提供了与主流 LLM 框架的无缝集成支持,包括 OpenAI、LangChain 和 LlamaIndex。

OpenAI 集成

OpenAI 示例

使用装饰器跟踪 OpenAI API 调用:

from langfuse.decorators import observe
from langfuse.openai import openai

@observe()
def generate_story():
response = openai.chat.completions.create(
model="gpt-3.5-turbo",
messages=[
{"role": "system", "content": "你是一个故事生成器"},
{"role": "user", "content": "写一个简短的科幻故事"}
]
)
return response.choices[0].message.content

LangChain 集成

LangChain 示例

使用装饰器跟踪 LangChain 执行过程:

from langfuse.decorators import observe, langfuse_context
from langchain.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI

@observe()
def process_with_langchain(query: str):
# 获取 LangChain 回调处理器
handler = langfuse_context.get_current_langchain_handler()

# 创建并执行 Chain
prompt = ChatPromptTemplate.from_template("回答问题: {query}")
model = ChatOpenAI()
chain = prompt | model

# 使用回调处理器执行
result = chain.invoke(
{"query": query},
config={"callbacks": [handler]}
)
return result

LlamaIndex 集成

LlamaIndex 示例

使用装饰器跟踪 LlamaIndex 检索和生成过程:

from langfuse.decorators import observe, langfuse_context
from llama_index.core import Settings, VectorStoreIndex
from llama_index.core.callbacks import CallbackManager

@observe()
def search_documents(query: str):
# 获取 LlamaIndex 回调处理器
handler = langfuse_context.get_current_llama_index_handler()
Settings.callback_manager = CallbackManager([handler])

# 执行搜索
index = VectorStoreIndex.from_documents(documents)
response = index.as_query_engine().query(query)
return response

高级功能

自定义输入输出处理

敏感数据处理

处理敏感数据时,可以禁用自动捕获并手动控制记录的内容:

@observe(capture_input=False, capture_output=False)
def process_sensitive_data(secret_data: str):
# 处理敏感数据
result = "处理结果"

# 手动设置过滤后的输入输出
langfuse_context.update_current_observation(
input="[已过滤的输入]",
output="[已过滤的输出]"
)

return result

并发处理

并发注意事项

在使用线程池时,需要特别注意上下文的传递:

from concurrent.futures import ThreadPoolExecutor
from langfuse.decorators import langfuse_context, observe

@observe()
def process_batch(items):
trace_id = langfuse_context.get_current_trace_id()
observation_id = langfuse_context.get_current_observation_id()

with ThreadPoolExecutor() as executor:
futures = [
executor.submit(
process_item,
item,
langfuse_parent_trace_id=trace_id,
langfuse_parent_observation_id=observation_id
)
for item in items
]
return [f.result() for f in futures]

@observe()
def process_item(item, **kwargs):
return f"处理项目: {item}"

最佳实践

最佳实践建议
  1. 合理使用观测类型

    • 使用 Event 记录关键节点
    • 使用 Span 记录耗时操作
    • 使用 Generation 记录 AI 模型调用
  2. 异常处理

    @observe()
    def process_with_error_handling(data):
    try:
    result = process_data(data)
    return result
    except Exception as e:
    langfuse_context.update_current_observation(
    level="ERROR",
    status_message=str(e)
    )
    raise
  3. 版本管理

    @observe()
    def generate_content(prompt: str, version: str = "v1"):
    langfuse_context.update_current_observation(
    version=version,
    metadata={"prompt_template": "标准模板"}
    )
    return "生成的内容"
短期环境处理

在 AWS Lambda 等短期环境中,确保在函数结束前调用 flush()

from langfuse.decorators import observe, langfuse_context

@observe()
def lambda_handler(event, context):
result = process_event(event)
langfuse_context.flush() # 确保数据发送完成
return result

故障排除

故障排除指南
  1. 启用调试模式

    import os
    os.environ["LANGFUSE_DEBUG"] = "True"
  2. 验证配置

    from langfuse.decorators import langfuse_context
    assert langfuse_context.auth_check()
  3. 常见问题解决

    • 如果观测数据未显示,检查是否正确调用了 flush()
    • 如果上下文关系不正确,检查并发处理中的父子关系设置
    • 如果数据不完整,检查网络连接和超时设置