langchain-migration-deep-dive
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangChain Migration Deep Dive
LangChain 迁移深度解析
Overview
概述
Comprehensive strategies for migrating to LangChain from legacy LLM implementations or other frameworks.
本内容提供从传统LLM实现或其他框架迁移至LangChain的全面策略。
Prerequisites
前置条件
- Existing LLM application to migrate
- Understanding of current architecture
- Test coverage for validation
- Staging environment for testing
- 待迁移的现有LLM应用
- 对当前架构的理解
- 用于验证的测试覆盖
- 用于测试的预发布环境
Migration Scenarios
迁移场景
Scenario 1: Raw OpenAI SDK to LangChain
场景1:原生OpenAI SDK 迁移至 LangChain
Before (Raw SDK)
迁移前(原生SDK)
python
undefinedpython
undefinedlegacy_openai.py
legacy_openai.py
import openai
client = openai.OpenAI()
def chat(message: str, history: list = None) -> str:
messages = [{"role": "system", "content": "You are helpful."}]
if history:
messages.extend(history)
messages.append({"role": "user", "content": message})
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.7
)
return response.choices[0].message.contentundefinedimport openai
client = openai.OpenAI()
def chat(message: str, history: list = None) -> str:
messages = [{"role": "system", "content": "You are helpful."}]
if history:
messages.extend(history)
messages.append({"role": "user", "content": message})
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=messages,
temperature=0.7
)
return response.choices[0].message.contentundefinedAfter (LangChain)
迁移后(LangChain)
python
undefinedpython
undefinedlangchain_chat.py
langchain_chat.py
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "You are helpful."),
MessagesPlaceholder(variable_name="history"),
("user", "{message}")
])
chain = prompt | llm | StrOutputParser()
def chat(message: str, history: list = None) -> str:
# Convert legacy format to LangChain messages
lc_history = []
if history:
for msg in history:
if msg["role"] == "user":
lc_history.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
lc_history.append(AIMessage(content=msg["content"]))
return chain.invoke({"message": message, "history": lc_history})undefinedfrom langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.output_parsers import StrOutputParser
from langchain_core.messages import HumanMessage, AIMessage
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.7)
prompt = ChatPromptTemplate.from_messages([
("system", "You are helpful."),
MessagesPlaceholder(variable_name="history"),
("user", "{message}")
])
chain = prompt | llm | StrOutputParser()
def chat(message: str, history: list = None) -> str:
# Convert legacy format to LangChain messages
lc_history = []
if history:
for msg in history:
if msg["role"] == "user":
lc_history.append(HumanMessage(content=msg["content"]))
elif msg["role"] == "assistant":
lc_history.append(AIMessage(content=msg["content"]))
return chain.invoke({"message": message, "history": lc_history})undefinedScenario 2: LlamaIndex to LangChain
场景2:LlamaIndex 迁移至 LangChain
Before (LlamaIndex)
迁移前(LlamaIndex)
python
undefinedpython
undefinedlegacy_llamaindex.py
legacy_llamaindex.py
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(llm=OpenAI(model="gpt-4o-mini"))
def query(question: str) -> str:
response = query_engine.query(question)
return str(response)
undefinedfrom llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.llms.openai import OpenAI
documents = SimpleDirectoryReader("data").load_data()
index = VectorStoreIndex.from_documents(documents)
query_engine = index.as_query_engine(llm=OpenAI(model="gpt-4o-mini"))
def query(question: str) -> str:
response = query_engine.query(question)
return str(response)
undefinedAfter (LangChain)
迁移后(LangChain)
python
undefinedpython
undefinedlangchain_rag.py
langchain_rag.py
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_community.document_loaders import DirectoryLoader
from langchain_community.vectorstores import FAISS
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.runnables import RunnablePassthrough
from langchain_core.output_parsers import StrOutputParser
Load documents
Load documents
loader = DirectoryLoader("data")
documents = loader.load()
loader = DirectoryLoader("data")
documents = loader.load()
Split documents
Split documents
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = splitter.split_documents(documents)
splitter = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=200)
splits = splitter.split_documents(documents)
Create vector store
Create vector store
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(splits, embeddings)
retriever = vectorstore.as_retriever()
embeddings = OpenAIEmbeddings()
vectorstore = FAISS.from_documents(splits, embeddings)
retriever = vectorstore.as_retriever()
Create RAG chain
Create RAG chain
llm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("""
Answer based on the context:
Context: {context}
Question: {question}
""")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
def query(question: str) -> str:
return chain.invoke(question)
undefinedllm = ChatOpenAI(model="gpt-4o-mini")
prompt = ChatPromptTemplate.from_template("""
Answer based on the context:
Context: {context}
Question: {question}
""")
def format_docs(docs):
return "\n\n".join(doc.page_content for doc in docs)
chain = (
{"context": retriever | format_docs, "question": RunnablePassthrough()}
| prompt
| llm
| StrOutputParser()
)
def query(question: str) -> str:
return chain.invoke(question)
undefinedScenario 3: Custom Agent to LangChain Agent
场景3:自定义Agent 迁移至 LangChain Agent
Before (Custom)
迁移前(自定义实现)
python
undefinedpython
undefinedlegacy_agent.py
legacy_agent.py
import json
def run_agent(query: str, tools: dict) -> str:
messages = [{"role": "user", "content": query}]
while True:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=[{"name": k, **v["schema"]} for k, v in tools.items()]
)
msg = response.choices[0].message
if msg.function_call:
# Execute tool
tool_name = msg.function_call.name
tool_args = json.loads(msg.function_call.arguments)
result = tools[tool_name]["func"](**tool_args)
messages.append({"role": "function", "name": tool_name, "content": result})
else:
return msg.contentundefinedimport json
def run_agent(query: str, tools: dict) -> str:
messages = [{"role": "user", "content": query}]
while True:
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
functions=[{"name": k, **v["schema"]} for k, v in tools.items()]
)
msg = response.choices[0].message
if msg.function_call:
# Execute tool
tool_name = msg.function_call.name
tool_args = json.loads(msg.function_call.arguments)
result = tools[tool_name]["func"](**tool_args)
messages.append({"role": "function", "name": tool_name, "content": result})
else:
return msg.contentundefinedAfter (LangChain)
迁移后(LangChain)
python
undefinedpython
undefinedlangchain_agent.py
langchain_agent.py
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder
from langchain_core.tools import tool
Convert tools to LangChain format
Convert tools to LangChain format
@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Calculate a math expression."""
return str(eval(expression))
tools = [search, calculate]
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant with tools."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
def run_agent(query: str) -> str:
result = executor.invoke({"input": query})
return result["output"]
undefined@tool
def search(query: str) -> str:
"""Search for information."""
return f"Results for: {query}"
@tool
def calculate(expression: str) -> str:
"""Calculate a math expression."""
return str(eval(expression))
tools = [search, calculate]
llm = ChatOpenAI(model="gpt-4o")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant with tools."),
("human", "{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad"),
])
agent = create_tool_calling_agent(llm, tools, prompt)
executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
def run_agent(query: str) -> str:
result = executor.invoke({"input": query})
return result["output"]
undefinedMigration Strategy
迁移策略
Phase 1: Assessment
阶段1:评估
python
undefinedpython
undefinedmigration_assessment.py
migration_assessment.py
import ast
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List
@dataclass
class MigrationItem:
file: str
line: int
pattern: str
complexity: str # low, medium, high
def assess_codebase(directory: str) -> List[MigrationItem]:
"""Scan codebase for migration items."""
items = []
patterns = {
"openai.ChatCompletion": ("OpenAI SDK v0", "medium"),
"openai.OpenAI": ("OpenAI SDK v1", "low"),
"llama_index": ("LlamaIndex", "high"),
"langchain.chains": ("LangChain legacy chains", "medium"),
"LLMChain": ("Legacy LLMChain", "low"),
}
for path in Path(directory).rglob("*.py"):
with open(path) as f:
content = f.read()
for i, line in enumerate(content.split("\n"), 1):
for pattern, (name, complexity) in patterns.items():
if pattern in line:
items.append(MigrationItem(
file=str(path),
line=i,
pattern=name,
complexity=complexity
))
return itemsimport ast
import os
from pathlib import Path
from dataclasses import dataclass
from typing import List
@dataclass
class MigrationItem:
file: str
line: int
pattern: str
complexity: str # low, medium, high
def assess_codebase(directory: str) -> List[MigrationItem]:
"""Scan codebase for migration items."""
items = []
patterns = {
"openai.ChatCompletion": ("OpenAI SDK v0", "medium"),
"openai.OpenAI": ("OpenAI SDK v1", "low"),
"llama_index": ("LlamaIndex", "high"),
"langchain.chains": ("LangChain legacy chains", "medium"),
"LLMChain": ("Legacy LLMChain", "low"),
}
for path in Path(directory).rglob("*.py"):
with open(path) as f:
content = f.read()
for i, line in enumerate(content.split("\n"), 1):
for pattern, (name, complexity) in patterns.items():
if pattern in line:
items.append(MigrationItem(
file=str(path),
line=i,
pattern=name,
complexity=complexity
))
return itemsGenerate migration report
Generate migration report
items = assess_codebase("src/")
print(f"Found {len(items)} migration items:")
for item in items:
print(f" {item.file}:{item.line} - {item.pattern} ({item.complexity})")
undefineditems = assess_codebase("src/")
print(f"Found {len(items)} migration items:")
for item in items:
print(f" {item.file}:{item.line} - {item.pattern} ({item.complexity})")
undefinedPhase 2: Parallel Implementation
阶段2:并行实现
python
undefinedpython
undefinedRun both systems in parallel for validation
Run both systems in parallel for validation
class DualRunner:
"""Run legacy and new implementations side by side."""
def __init__(self, legacy_fn, new_fn):
self.legacy_fn = legacy_fn
self.new_fn = new_fn
self.discrepancies = []
async def run(self, *args, **kwargs):
"""Run both and compare."""
legacy_result = await self.legacy_fn(*args, **kwargs)
new_result = await self.new_fn(*args, **kwargs)
if not self._compare(legacy_result, new_result):
self.discrepancies.append({
"args": args,
"kwargs": kwargs,
"legacy": legacy_result,
"new": new_result
})
# Return new implementation result
return new_result
def _compare(self, a, b) -> bool:
"""Compare results for equivalence."""
# Implement comparison logic
return True # Placeholderundefinedclass DualRunner:
"""Run legacy and new implementations side by side."""
def __init__(self, legacy_fn, new_fn):
self.legacy_fn = legacy_fn
self.new_fn = new_fn
self.discrepancies = []
async def run(self, *args, **kwargs):
"""Run both and compare."""
legacy_result = await self.legacy_fn(*args, **kwargs)
new_result = await self.new_fn(*args, **kwargs)
if not self._compare(legacy_result, new_result):
self.discrepancies.append({
"args": args,
"kwargs": kwargs,
"legacy": legacy_result,
"new": new_result
})
# Return new implementation result
return new_result
def _compare(self, a, b) -> bool:
"""Compare results for equivalence."""
# Implement comparison logic
return True # PlaceholderundefinedPhase 3: Gradual Rollout
阶段3:逐步上线
python
undefinedpython
undefinedFeature flag based rollout
Feature flag based rollout
import random
class FeatureFlag:
"""Control rollout percentage."""
def __init__(self, rollout_percentage: float = 0):
self.percentage = rollout_percentage
def is_enabled(self, user_id: str = None) -> bool:
"""Check if feature is enabled for user."""
if user_id:
# Consistent per-user
hash_val = hash(user_id) % 100
return hash_val < self.percentage
return random.random() * 100 < self.percentageimport random
class FeatureFlag:
"""Control rollout percentage."""
def __init__(self, rollout_percentage: float = 0):
self.percentage = rollout_percentage
def is_enabled(self, user_id: str = None) -> bool:
"""Check if feature is enabled for user."""
if user_id:
# Consistent per-user
hash_val = hash(user_id) % 100
return hash_val < self.percentage
return random.random() * 100 < self.percentageUsage
Usage
langchain_flag = FeatureFlag(rollout_percentage=10) # 10% rollout
def process_request(user_id: str, message: str):
if langchain_flag.is_enabled(user_id):
return langchain_chat(message)
else:
return legacy_chat(message)
undefinedlangchain_flag = FeatureFlag(rollout_percentage=10) # 10% rollout
def process_request(user_id: str, message: str):
if langchain_flag.is_enabled(user_id):
return langchain_chat(message)
else:
return legacy_chat(message)
undefinedPhase 4: Validation and Cleanup
阶段4:验证与清理
python
undefinedpython
undefinedValidation script
Validation script
import pytest
class MigrationValidator:
"""Validate migration is complete and correct."""
def __init__(self, test_cases: list):
self.test_cases = test_cases
def run_validation(self, new_fn) -> dict:
"""Run all test cases and report."""
results = {"passed": 0, "failed": 0, "errors": []}
for case in self.test_cases:
try:
result = new_fn(**case["input"])
if self._validate(result, case["expected"]):
results["passed"] += 1
else:
results["failed"] += 1
results["errors"].append({
"case": case,
"actual": result
})
except Exception as e:
results["failed"] += 1
results["errors"].append({
"case": case,
"error": str(e)
})
return results
def _validate(self, actual, expected) -> bool:
"""Validate result meets expectations."""
# Implement validation logic
return Trueimport pytest
class MigrationValidator:
"""Validate migration is complete and correct."""
def __init__(self, test_cases: list):
self.test_cases = test_cases
def run_validation(self, new_fn) -> dict:
"""Run all test cases and report."""
results = {"passed": 0, "failed": 0, "errors": []}
for case in self.test_cases:
try:
result = new_fn(**case["input"])
if self._validate(result, case["expected"]):
results["passed"] += 1
else:
results["failed"] += 1
results["errors"].append({
"case": case,
"actual": result
})
except Exception as e:
results["failed"] += 1
results["errors"].append({
"case": case,
"error": str(e)
})
return results
def _validate(self, actual, expected) -> bool:
"""Validate result meets expectations."""
# Implement validation logic
return TrueRun validation
Run validation
validator = MigrationValidator([
{"input": {"message": "Hello"}, "expected": {"type": "greeting"}},
# ... more test cases
])
results = validator.run_validation(langchain_chat)
print(f"Passed: {results['passed']}, Failed: {results['failed']}")
undefinedvalidator = MigrationValidator([
{"input": {"message": "Hello"}, "expected": {"type": "greeting"}},
# ... more test cases
])
results = validator.run_validation(langchain_chat)
print(f"Passed: {results['passed']}, Failed: {results['failed']}")
undefinedMigration Checklist
迁移检查清单
- Codebase assessed for migration items
- Test coverage added for current behavior
- LangChain equivalents implemented
- Parallel running validation passed
- Gradual rollout completed
- Legacy code removed
- Documentation updated
- 已完成代码库迁移项评估
- 已为当前行为添加测试覆盖
- 已实现LangChain等效逻辑
- 已通过并行运行验证
- 已完成逐步上线
- 已移除传统代码
- 已更新文档
Common Issues
常见问题
| Issue | Solution |
|---|---|
| Different response format | Add output parser adapter |
| Missing streaming support | Implement streaming callbacks |
| Memory format mismatch | Convert message history format |
| Tool schema differences | Update tool definitions |
| 问题 | 解决方案 |
|---|---|
| 响应格式不一致 | 添加输出解析适配器 |
| 缺少流式支持 | 实现流式回调 |
| 记忆格式不匹配 | 转换消息历史格式 |
| 工具 schema 差异 | 更新工具定义 |
Resources
参考资源
Next Steps
后续步骤
Use for LangChain version upgrades.
langchain-upgrade-migration使用工具进行LangChain版本升级。
langchain-upgrade-migration