langchain-rate-limits
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLangChain Rate Limits
LangChain 速率限制
Overview
概述
Implement robust rate limiting and retry strategies for LangChain applications to handle API quotas gracefully.
为LangChain应用实现可靠的速率限制和重试策略,以优雅地处理API配额问题。
Prerequisites
前置条件
- LangChain installed with LLM provider
- Understanding of provider rate limits
- tenacity package for advanced retry logic
- 已安装LangChain及对应LLM提供商依赖
- 了解提供商的速率限制规则
- 用于高级重试逻辑的tenacity包
Instructions
操作步骤
Step 1: Understand Provider Limits
步骤1:了解提供商限制
python
undefinedpython
undefinedCommon rate limits by provider:
各提供商的常见速率限制:
RATE_LIMITS = {
"openai": {
"gpt-4o": {"rpm": 10000, "tpm": 800000},
"gpt-4o-mini": {"rpm": 10000, "tpm": 4000000},
},
"anthropic": {
"claude-3-5-sonnet": {"rpm": 4000, "tpm": 400000},
},
"google": {
"gemini-1.5-pro": {"rpm": 360, "tpm": 4000000},
}
}
RATE_LIMITS = {
"openai": {
"gpt-4o": {"rpm": 10000, "tpm": 800000},
"gpt-4o-mini": {"rpm": 10000, "tpm": 4000000},
},
"anthropic": {
"claude-3-5-sonnet": {"rpm": 4000, "tpm": 400000},
},
"google": {
"gemini-1.5-pro": {"rpm": 360, "tpm": 4000000},
}
}
rpm = requests per minute, tpm = tokens per minute
rpm = 每分钟请求数,tpm = 每分钟令牌数
undefinedundefinedStep 2: Built-in Retry Configuration
步骤2:内置重试配置
python
from langchain_openai import ChatOpenAIpython
from langchain_openai import ChatOpenAILangChain has built-in retry with exponential backoff
LangChain内置了带指数退避的重试机制
llm = ChatOpenAI(
model="gpt-4o-mini",
max_retries=3, # Number of retries
request_timeout=30, # Timeout per request
)
undefinedllm = ChatOpenAI(
model="gpt-4o-mini",
max_retries=3, # 重试次数
request_timeout=30, # 单次请求超时时间
)
undefinedStep 3: Advanced Retry with Tenacity
步骤3:基于Tenacity的高级重试
python
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from openai import RateLimitError, APIError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((RateLimitError, APIError))
)
def call_with_retry(chain, input_data):
"""Call chain with exponential backoff."""
return chain.invoke(input_data)python
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type
)
from openai import RateLimitError, APIError
@retry(
stop=stop_after_attempt(5),
wait=wait_exponential(multiplier=1, min=4, max=60),
retry=retry_if_exception_type((RateLimitError, APIError))
)
def call_with_retry(chain, input_data):
"""通过指数退避方式调用chain。"""
return chain.invoke(input_data)Usage
使用示例
result = call_with_retry(chain, {"input": "Hello"})
undefinedresult = call_with_retry(chain, {"input": "Hello"})
undefinedStep 4: Rate Limiter Wrapper
步骤4:速率限制器封装
python
import asyncio
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter for API calls."""
def __init__(self, requests_per_minute: int = 60):
self.rpm = requests_per_minute
self.interval = 60.0 / requests_per_minute
self.timestamps = deque()
self.lock = Lock()
def acquire(self):
"""Block until request can be made."""
with self.lock:
now = time.time()
# Remove timestamps older than 1 minute
while self.timestamps and now - self.timestamps[0] > 60:
self.timestamps.popleft()
if len(self.timestamps) >= self.rpm:
sleep_time = 60 - (now - self.timestamps[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.timestamps.append(time.time())python
import asyncio
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""用于API调用的令牌桶速率限制器。"""
def __init__(self, requests_per_minute: int = 60):
self.rpm = requests_per_minute
self.interval = 60.0 / requests_per_minute
self.timestamps = deque()
self.lock = Lock()
def acquire(self):
"""阻塞直到可以发起请求。"""
with self.lock:
now = time.time()
# 移除1分钟前的时间戳
while self.timestamps and now - self.timestamps[0] > 60:
self.timestamps.popleft()
if len(self.timestamps) >= self.rpm:
sleep_time = 60 - (now - self.timestamps[0])
if sleep_time > 0:
time.sleep(sleep_time)
self.timestamps.append(time.time())Usage with LangChain
与LangChain结合使用
rate_limiter = RateLimiter(requests_per_minute=100)
def rate_limited_call(chain, input_data):
rate_limiter.acquire()
return chain.invoke(input_data)
undefinedrate_limiter = RateLimiter(requests_per_minute=100)
def rate_limited_call(chain, input_data):
rate_limiter.acquire()
return chain.invoke(input_data)
undefinedStep 5: Async Rate Limiting
步骤5:异步速率限制
python
import asyncio
from asyncio import Semaphore
class AsyncRateLimiter:
"""Async rate limiter with semaphore."""
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
async def call(self, chain, input_data):
async with self.semaphore:
return await chain.ainvoke(input_data)python
import asyncio
from asyncio import Semaphore
class AsyncRateLimiter:
"""基于信号量的异步速率限制器。"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = Semaphore(max_concurrent)
async def call(self, chain, input_data):
async with self.semaphore:
return await chain.ainvoke(input_data)Batch processing with rate limiting
带速率限制的批量处理
async def process_batch(chain, inputs: list, max_concurrent: int = 5):
limiter = AsyncRateLimiter(max_concurrent)
tasks = [limiter.call(chain, inp) for inp in inputs]
return await asyncio.gather(*tasks, return_exceptions=True)
undefinedasync def process_batch(chain, inputs: list, max_concurrent: int = 5):
limiter = AsyncRateLimiter(max_concurrent)
tasks = [limiter.call(chain, inp) for inp in inputs]
return await asyncio.gather(*tasks, return_exceptions=True)
undefinedOutput
实现效果
- Configured retry logic with exponential backoff
- Rate limiter class for request throttling
- Async batch processing with concurrency control
- Graceful handling of rate limit errors
- 配置了带指数退避的重试逻辑
- 用于请求限流的速率限制器类
- 带并发控制的异步批量处理
- 优雅处理速率限制错误
Examples
示例
Handling Rate Limits in Production
生产环境中处理速率限制
python
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
llm = ChatOpenAI(
model="gpt-4o-mini",
max_retries=5,
)python
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig
llm = ChatOpenAI(
model="gpt-4o-mini",
max_retries=5,
)Use batch with max_concurrency
使用batch方法并设置最大并发数
inputs = [{"input": f"Query {i}"} for i in range(100)]
results = chain.batch(
inputs,
config=RunnableConfig(max_concurrency=10) # Limit concurrent calls
)
undefinedinputs = [{"input": f"Query {i}"} for i in range(100)]
results = chain.batch(
inputs,
config=RunnableConfig(max_concurrency=10) # 限制并发调用数
)
undefinedFallback on Rate Limit
速率限制时的降级方案
python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
primary = ChatOpenAI(model="gpt-4o-mini", max_retries=2)
fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022")python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
primary = ChatOpenAI(model="gpt-4o-mini", max_retries=2)
fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022")Automatically switch to fallback on rate limit
触发速率限制时自动切换到降级模型
robust_llm = primary.with_fallbacks([fallback])
undefinedrobust_llm = primary.with_fallbacks([fallback])
undefinedError Handling
错误处理
| Error | Cause | Solution |
|---|---|---|
| RateLimitError | Exceeded quota | Implement backoff, reduce concurrency |
| Timeout | Request too slow | Increase timeout, check network |
| 429 Too Many Requests | API throttled | Wait and retry with backoff |
| Quota Exceeded | Monthly limit hit | Upgrade plan or switch provider |
| 错误类型 | 原因 | 解决方案 |
|---|---|---|
| RateLimitError | 超出配额 | 实现退避机制,降低并发数 |
| Timeout | 请求过慢 | 增加超时时间,检查网络 |
| 429 Too Many Requests | API被限流 | 等待后使用退避策略重试 |
| Quota Exceeded | 月度配额耗尽 | 升级套餐或切换提供商 |
Resources
参考资源
Next Steps
下一步
Proceed to for security best practices.
langchain-security-basics继续学习以了解安全最佳实践。
langchain-security-basics