从手写 Pipeline 到声明式 AI 应用 —— 理解 LLM / PromptTemplate / OutputParser / Tool / Retriever / Chain / Runnable
| 痛点 | 手写做法 | 问题 |
|---|---|---|
| Prompt 拼接 | f"你是{role},请回答{question}" |
变量多了像乱麻,模板和逻辑混在一起 |
| 输出解析 | json.loads(response) + try/except |
LLM 输出不稳定,经常解析失败 |
| Tool Calling | 手写 JSON Schema + if/elif dispatch | 样板代码多,函数多了维护困难 |
| 数据库切换 | ChromaDB → FAISS 改所有检索代码 | 供应商绑定,换数据库=重构 |
| 流程编排 | 手写 pipeline 函数按顺序调用 | 线性写死,加一步改全局 |
# 命令式:描述"怎么做"
def rag_pipeline(query):
docs = retriever.retrieve(query)
context = format_docs(docs)
messages = build_prompt(query, context)
response = llm.call(messages)
return extract_answer(response)
# 痛点:
# - 步骤线性写死
# - 加并行分支要手写多线程
# - 切模型/数据库改所有代码
# 声明式:描述"数据怎么流"
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| llm
| parser
)
answer = rag_chain.invoke(query)
# 优势:
# - 数据流一目了然
# - 加并行分支只需加一行
# - 换模型/数据库只改一个构造函数
| 管道连接。
| 场景 | 推荐工具 | 示例 |
|---|---|---|
| 单次 LLM 调用 | 原生 openai | 简单问答、翻译 |
| 复杂 Pipeline | LangChain LCEL | RAG、多步骤处理 |
| 多 Agent / 状态机 | LangGraph | Agent 循环、人工审核、条件路由 |
LangChain 的所有组件(LLM、Prompt、Parser、Retriever...)都实现了同一个接口:Runnable。
| 无差别连接:
数据原样通过,不做任何处理。在 RAG Chain 中把 question 原样传给 Prompt:
{"context": retriever | format_docs,
"question": RunnablePassthrough()}
把任意 Python 函数包装成 Runnable,接入管道:
chain = prompt | llm | parser \
| RunnableLambda(count_words)
并发执行多个分支,结果合并为一个 dict:
RunnableParallel(
summary=chain_summary,
translation=chain_en,
)
根据条件路由到不同分支(if/elif/else):
RunnableBranch(
(is_technical, tech_chain),
(is_hr, hr_chain),
default_chain,
)
每个环节的输出自动成为下一环节的输入。| 操作符自动完成类型转换和数据传递。
from openai import OpenAI
client = OpenAI(api_key="...")
response = client.chat.completions.create(
model="deepseek-chat",
messages=[
{"role": "user",
"content": "什么是RAG?"}
],
temperature=0.3,
)
answer = response.choices[0].message.content
// 每次都要重复 model/messages/temperature
// 切模型要改所有调用点
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
llm = ChatOpenAI(
model="deepseek-chat",
temperature=0.3,
max_retries=3, # 自动重试
)
# 配置一次,到处复用
result = llm.invoke([
HumanMessage(content="什么是RAG?")
])
answer = result.content
// 切模型只改构造函数一处
// 可和 PromptTemplate | 管道组合
role = "Python 专家"
question = "装饰器是什么"
system = f"你是{role}。要求:简洁。"
user = f"请解释:{question}"
messages = [
{"role": "system", "content": system},
{"role": "user", "content": user},
]
// 模板和逻辑混在一起
// 改 Prompt 要翻代码
// 变量多了不可读
from langchain_core.prompts \
import ChatPromptTemplate
prompt = ChatPromptTemplate.from_messages([
("system", "你是{role}。要求:{req}"),
("user", "请解释:{question}"),
])
messages = prompt.invoke({
"role": "Python 专家",
"req": "用比喻解释",
"question": "装饰器",
})
// 模板和变量分离
// 变量缺失会报 KeyError(安全)
// 同一模板可复用无数次
from langchain_core.prompts import FewShotChatMessagePromptTemplate
examples = [
{"input": "今天心情真好", "output": "POSITIVE"},
{"input": "等了三小时", "output": "NEGATIVE"},
{"input": "还行吧", "output": "NEUTRAL"},
]
# FewShotChatMessagePromptTemplate 自动管理示例格式
# 最终 Prompt = 示例对话 + 当前问题
AIMessage → 纯字符串
chain | StrOutputParser()
自动解析 JSON + 去 markdown 包裹
chain | JsonOutputParser()
类型安全 + 自动验证 + retry
chain | PydanticOutputParser(pydantic_object=Person)
// 1. 手写 JSON Schema(容易出错)
tools = [{
"type": "function",
"function": {
"name": "get_weather",
"parameters": {
"properties": {
"city": {"type": "string"}
},
"required": ["city"],
}
}
}]
// 2. 解析 tool_call
tc = response.choices[0].message.tool_calls[0]
args = json.loads(tc.function.arguments)
// 3. 手工 dispatch
if tc.function.name == "get_weather":
result = get_weather(args["city"])
elif tc.function.name == "get_stock":
result = get_stock(...)
// 10 个函数 = 10 个 elif
from langchain_core.tools import tool
// @tool 自动从函数签名+docstring生成Schema
@tool
def get_weather(city: str) -> str:
"""查询城市当前天气"""
return weather_db.get(city, "未知")
@tool
def get_stock(symbol: str) -> float:
"""查询股票价格"""
return stock_db.get(symbol, 0.0)
// 一行绑定
llm_with_tools = llm.bind_tools([
get_weather, get_stock
])
// LLM 自动判断何时调哪个工具
// 不需要写 dispatch if/elif
collection.query() → 返回深层 dict。
FAISS 的 API 是 index.search() → 返回完全不同的格式。results = collection.query(
query_texts=["公司政策"],
n_results=3,
)
for i in range(len(results["ids"][0])):
doc_id = results["ids"][0][i]
text = results["documents"][0][i]
dist = results["distances"][0][i]
meta = results["metadatas"][0][i]
// 深层 dict 取值,容易出错
// 换成 FAISS?API 完全不同
// 统一接口,不管底层是什么数据库
retriever = vectorstore.as_retriever(
search_kwargs={"k": 3}
)
docs = retriever.invoke("公司政策")
// → list[Document]
for doc in docs:
print(doc.page_content) # 统一
print(doc.metadata["source"]) # 统一
// 换数据库只改构造函数,检索代码不动
// Chroma.from_documents() → 自动embed+建索引
retriever.invoke(query) → list[Document]对比命令式 vs 声明式 —— 同样的 RAG 逻辑:
def rag_pipeline(query):
# Step 1
docs = retriever.retrieve(query)
# Step 2
context = format_docs(docs)
# Step 3
messages = build_prompt(query, context)
# Step 4
response = llm.call(messages)
# Step 5
return extract_answer(response)
// 做A → 存变量 → 做B → 存变量 → 做C
// 关注"怎么做"
rag_chain = (
{
"context": retriever | format_docs,
"question": RunnablePassthrough(),
}
| prompt
| llm
| StrOutputParser()
)
answer = rag_chain.invoke(query)
// A | B | C | D
// 关注"数据怎么流"
| 模式 | 语法 | 场景 |
|---|---|---|
| 线性管道 | A | B | C |
顺序执行,上一步输出→下一步输入 |
| 并行分支 | RunnableParallel(a=A, b=B) |
翻译+摘要同时进行 |
| 透传数据 | RunnablePassthrough() |
RAG 中 question 原样传给 Prompt |
| 自定义处理 | RunnableLambda(my_func) |
统计字数、格式化输出 |
| 条件路由 | RunnableBranch((cond, chain), default) |
不同类型问题走不同专家 Chain |
| Dict 组装 | {"key1": chain_a, "key2": chain_b} |
多路输入拼成 Prompt 变量 |
from langchain_core.runnables import RunnableParallel
# 两个 LLM 调用并发执行,不是串行!
parallel_chain = RunnableParallel(
summary=chain_summary, # 分支1:生成摘要
translation=chain_translation, # 分支2:翻译英文
)
result = parallel_chain.invoke({"text": "..."})
# result = {"summary": "...", "translation": "..."}
# 耗时 ≈ max(摘要耗时, 翻译耗时),而非相加
| 组件 | 手写版(~300行) | LangChain 版(~120行) | 减少 |
|---|---|---|---|
| 分块器 | RecursiveChunker 类 (~50行) |
RecursiveCharacterTextSplitter() |
1行 |
| 向量化+存储 | chromadb.Client() + create_collection() + add() (~15行) |
Chroma.from_documents() |
1行 |
| Prompt 构建 | f-string + 手动 messages (~20行) | ChatPromptTemplate 声明式 (~6行) |
~70% |
| 检索调用 | collection.query() → 深层 dict 取值 (~15行) |
retriever.invoke() → list[Document] |
1行 |
| Pipeline 编排 | 手写函数调用链 (~30行) | LCEL 管道 (~5行) | ~83% |
rag_chain = (
{
"context": retriever | format_docs, // 检索 + 格式化 → 注入 {context}
"question": RunnablePassthrough(), // 用户问题 → 注入 {question}
}
| prompt // 填充模板变量
| llm // 生成 AIMessage
| StrOutputParser() // 提取纯文本
)
answer = rag_chain.invoke("公司年假多少天?")
class RecursiveChunker: ← 自己实现分块算法
def _split_recursive(...)
def _split_by_size(...)
class ChunkedRetriever: ← 自己管理 ChromaDB
def _index_documents(...)
def retrieve(...): ← 深层 dict 取值
results["ids"][0][i]
def generate_answer(query, docs): ← 手拼 f-string
context = f"[文档{i}] ..."
// 分块:内置
splitter = RecursiveCharacterTextSplitter(
chunk_size=500, chunk_overlap=50)
// 向量化+存储:一行
vectorstore = Chroma.from_documents(
docs, embedding, collection_name="rag")
// 检索:统一接口
retriever = vectorstore.as_retriever(k=3)
// Pipeline:声明式
chain = (
{"context": retriever|format,
"question": Passthrough()}
| prompt | llm | parser)
| 顺序 | Demo | 核心概念 | 文件 |
|---|---|---|---|
| 1 | LLM 抽象 | ChatOpenAI 统一接口、流式输出 | demo_01_llm.py |
| 2 | PromptTemplate | 模板变量分离、Few-shot | demo_02_prompt_template.py |
| 3 | OutputParser | Str/Json/Pydantic 解析、自动纠错 | demo_03_output_parser.py |
| 4 | Tool | @tool 装饰器、多工具链式调用 | demo_04_tool.py |
| 5 | Retriever | 统一检索接口、元数据过滤 | demo_05_retriever.py |
| 6 | Chain | LCEL | 管道、声明式数据流 | demo_06_chain.py |
| 7 | Runnable | Passthrough/Parallel/Lambda 组合 | demo_07_runnable.py |
| 8 | RAG 重构 | 手写版 vs LangChain 版 完整对比 | demo_08_rag_refactor.py |