跳到主要内容

最佳实践与示例

概述

本文档提供了一系列实际应用场景的最佳实践和示例代码,帮助您更好地使用 Langfuse JS/TS SDK。示例包括:

  • 基于文档的问答系统
  • 多轮对话系统
  • 内容生成系统
  • API 集成方案
  • 测试集成方法

API 集成示例

Express 集成

Express 集成说明

展示如何在 Express 应用中集成 Langfuse SDK,实现:

  • 请求追踪
  • 输入验证
  • 错误处理
  • 优雅关闭
import express from "express";
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import OpenAI from "openai";

const app = express();
const langfuse = new Langfuse();
const openai = new OpenAI();

app.post("/chat", async (req, res) => {
const trace = langfuse.trace({
name: "chat-api",
userId: req.headers["x-user-id"] as string,
});

try {
// 验证输入
const validationSpan = trace.span({ name: "input-validation" });
if (!req.body.message) {
validationSpan.end({ level: "ERROR" });
return res.status(400).json({ error: "Message is required" });
}
validationSpan.end();

// 生成回复
const chatSpan = trace.span({ name: "chat-generation" });
const completion = await observeOpenAI(openai, {
parent: chatSpan,
generationName: "chat-completion",
}).chat.completions.create({
model: "gpt-3.5-turbo",
messages: [{ role: "user", content: req.body.message }],
});

chatSpan.end();

res.json({
response: completion.choices[0].message.content,
});
} catch (error) {
trace.event({
name: "error",
level: "ERROR",
statusMessage: error.message,
});
res.status(500).json({ error: "Internal server error" });
}
});

// 确保关闭前发送所有数据
process.on("SIGTERM", async () => {
await langfuse.shutdownAsync();
process.exit(0);
});

Next.js API Route

Next.js 集成说明

展示如何在 Next.js API Route 中集成 Langfuse SDK,处理:

  • 请求方法验证
  • 异步数据处理
  • 错误上报
  • 数据发送保证
import { NextApiRequest, NextApiResponse } from "next";
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import OpenAI from "openai";
import { waitUntil } from "@vercel/functions";

const langfuse = new Langfuse();
const openai = new OpenAI();

export default async function handler(
req: NextApiRequest,
res: NextApiResponse
) {
if (req.method !== "POST") {
return res.status(405).json({ error: "Method not allowed" });
}

const trace = langfuse.trace({
name: "nextjs-chat-api",
userId: req.headers["x-user-id"] as string,
});

try {
const response = await observeOpenAI(openai, {
parent: trace,
generationName: "chat-completion",
}).chat.completions.create({
model: "gpt-3.5-turbo",
messages: [{ role: "user", content: req.body.message }],
});

// 等待数据发送
waitUntil(langfuse.flushAsync());

res.json({
response: response.choices[0].message.content,
});
} catch (error) {
trace.event({
name: "api-error",
level: "ERROR",
statusMessage: error.message,
});

// 等待错误数据发送
waitUntil(langfuse.flushAsync());

res.status(500).json({ error: "Internal server error" });
}
}

问答系统

场景说明

实现一个基于文档的问答系统,包含:

  • 文档索引创建
  • 相关内容检索
  • 答案生成
  • 性能追踪
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import OpenAI from "openai";
import { Document, VectorStoreIndex } from "@llamaindex/core";

const langfuse = new Langfuse();
const openai = new OpenAI();

async function documentQA(question: string, documents: Document[]) {
const trace = langfuse.trace({
name: "document-qa",
metadata: { questionType: "document-based" },
});

// 1. 创建索引
const indexSpan = trace.span({ name: "create-index" });
const index = await VectorStoreIndex.fromDocuments(documents);
indexSpan.end();

// 2. 检索相关文档
const retrievalSpan = trace.span({ name: "retrieve-context" });
const queryEngine = index.asQueryEngine();
const searchResult = await queryEngine.query(question);
retrievalSpan.end({
metadata: {
resultCount: searchResult.length,
similarity: searchResult.score,
},
});

// 3. 生成回答
const promptSpan = trace.span({ name: "generate-answer" });
const response = await observeOpenAI(openai, {
parent: promptSpan,
generationName: "qa-completion",
}).chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content:
"你是一个专业的问答助手,请基于提供的上下文回答问题。如果上下文中没有相关信息,请明确说明。",
},
{
role: "user",
content: `问题:${question}\n\n上下文:${searchResult.text}`,
},
],
});
promptSpan.end();

return response.choices[0].message.content;
}

对话系统

场景说明

实现一个多轮对话系统,具备:

  • 会话状态管理
  • 上下文维护
  • 用户输入追踪
  • 响应生成记录
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import OpenAI from "openai";

interface Message {
role: "system" | "user" | "assistant";
content: string;
}

class DialogueManager {
private langfuse: Langfuse;
private openai: OpenAI;
private messages: Message[];
private trace: any;

constructor() {
this.langfuse = new Langfuse();
this.openai = new OpenAI();
this.messages = [
{
role: "system",
content: "你是一个友好的AI助手,能够进行自然的对话交流。",
},
];
this.trace = this.langfuse.trace({
name: "dialogue-session",
metadata: { sessionStart: new Date().toISOString() },
});
}

async addUserMessage(content: string): Promise<string> {
// 记录用户输入
const userSpan = this.trace.span({
name: "user-input",
input: content,
});

// 添加到对话历史
this.messages.push({ role: "user", content });
userSpan.end();

// 生成回复
const responseSpan = this.trace.span({ name: "generate-response" });

const completion = await observeOpenAI(this.openai, {
parent: responseSpan,
generationName: "chat-completion",
}).chat.completions.create({
model: "gpt-3.5-turbo",
messages: this.messages,
});

const response = completion.choices[0].message.content;
this.messages.push({ role: "assistant", content: response });

responseSpan.end({
metadata: {
messageCount: this.messages.length,
turnCount: (this.messages.length - 1) / 2,
},
});

return response;
}

async endSession() {
this.trace.update({
metadata: {
sessionEnd: new Date().toISOString(),
totalTurns: (this.messages.length - 1) / 2,
},
});
await this.langfuse.shutdownAsync();
}
}

内容生成系统

场景说明

实现一个多阶段的内容生成系统,包括:

  • 大纲生成
  • 分节扩写
  • 内容优化
  • 质量评估
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import OpenAI from "openai";

interface Section {
title: string;
content: string;
}

async function generateArticle(topic: string): Promise<Section[]> {
const langfuse = new Langfuse();
const openai = new OpenAI();

const trace = langfuse.trace({
name: "article-generation",
metadata: { topic },
});

// 1. 生成大纲
const outlineSpan = trace.span({ name: "generate-outline" });
const outlineResponse = await observeOpenAI(openai, {
parent: outlineSpan,
generationName: "outline-generation",
}).chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content:
"你是一个专业的文章策划师。请为给定主题生成一个详细的文章大纲,包含3-5个主要章节。输出格式为JSON数组。",
},
{
role: "user",
content: `主题:${topic}`,
},
],
response_format: { type: "json_object" },
});

const outline = JSON.parse(
outlineResponse.choices[0].message.content
).sections;
outlineSpan.end({
output: outline,
metadata: { sectionCount: outline.length },
});

// 2. 扩写每个章节
const sections: Section[] = [];
for (const section of outline) {
const sectionSpan = trace.span({
name: "expand-section",
metadata: { sectionTitle: section },
});

const contentResponse = await observeOpenAI(openai, {
parent: sectionSpan,
generationName: "section-generation",
}).chat.completions.create({
model: "gpt-4",
messages: [
{
role: "system",
content: "你是一个专业的文章写作专家。请详细展开这个章节的内容。",
},
{
role: "user",
content: `主题:${topic}\n章节:${section}`,
},
],
});

const content = contentResponse.choices[0].message.content;
sections.push({
title: section,
content: content,
});

sectionSpan.end({
metadata: {
charCount: content.length,
wordCount: content.split(/\s+/).length,
},
});
}

trace.update({
metadata: {
totalSections: sections.length,
averageWordCount:
sections.reduce(
(acc, sec) => acc + sec.content.split(/\s+/).length,
0
) / sections.length,
},
});

return sections;
}

测试集成

测试集成说明

展示如何在测试环境中集成 Langfuse SDK:

  • 配置测试环境
  • 模拟 OpenAI 调用
  • 验证追踪逻辑
  • 清理测试数据
import { Langfuse } from "langfuse";
import { observeOpenAI } from "langfuse-openai";
import { jest } from "@jest/globals";

describe("Chat Service", () => {
let langfuse: Langfuse;

beforeEach(() => {
langfuse = new Langfuse({
secretKey: "test-key",
publicKey: "test-key",
baseUrl: "http://localhost:3000",
});
});

afterEach(async () => {
await langfuse.shutdownAsync();
});

test("should handle chat completion", async () => {
const trace = langfuse.trace({ name: "test-chat" });

const mockOpenAI = {
chat: {
completions: {
create: jest.fn().mockResolvedValue({
choices: [
{
message: { content: "Test response" },
},
],
}),
},
},
};

const response = await observeOpenAI(mockOpenAI, {
parent: trace,
generationName: "test-completion",
}).chat.completions.create({
model: "gpt-3.5-turbo",
messages: [{ role: "user", content: "test" }],
});

expect(response.choices[0].message.content).toBe("Test response");
expect(mockOpenAI.chat.completions.create).toHaveBeenCalled();
});
});
实践建议
  1. 代码组织

    • 按功能模块拆分
    • 使用清晰的命名
    • 保持代码简洁
    • 添加必要注释
  2. 错误处理

    • 捕获所有异常
    • 记录错误信息
    • 提供错误反馈
    • 实现优雅降级
  3. 数据管理

    • 及时发送数据
    • 控制数据大小
    • 处理敏感信息
    • 清理过期数据
  4. 性能优化

    • 使用异步操作
    • 实现并行处理
    • 控制资源使用
    • 监控性能指标