langchain-rate-limits

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

LangChain Rate Limits

LangChain 速率限制

Overview

概述

Implement robust rate limiting and retry strategies for LangChain applications to handle API quotas gracefully.
为LangChain应用实现可靠的速率限制和重试策略,以优雅地处理API配额问题。

Prerequisites

前置条件

  • LangChain installed with LLM provider
  • Understanding of provider rate limits
  • tenacity package for advanced retry logic
  • 已安装LangChain及对应LLM提供商依赖
  • 了解提供商的速率限制规则
  • 用于高级重试逻辑的tenacity包

Instructions

操作步骤

Step 1: Understand Provider Limits

步骤1:了解提供商限制

python
undefined
python
undefined

Common rate limits by provider:

各提供商的常见速率限制:

RATE_LIMITS = { "openai": { "gpt-4o": {"rpm": 10000, "tpm": 800000}, "gpt-4o-mini": {"rpm": 10000, "tpm": 4000000}, }, "anthropic": { "claude-3-5-sonnet": {"rpm": 4000, "tpm": 400000}, }, "google": { "gemini-1.5-pro": {"rpm": 360, "tpm": 4000000}, } }
RATE_LIMITS = { "openai": { "gpt-4o": {"rpm": 10000, "tpm": 800000}, "gpt-4o-mini": {"rpm": 10000, "tpm": 4000000}, }, "anthropic": { "claude-3-5-sonnet": {"rpm": 4000, "tpm": 400000}, }, "google": { "gemini-1.5-pro": {"rpm": 360, "tpm": 4000000}, } }

rpm = requests per minute, tpm = tokens per minute

rpm = 每分钟请求数,tpm = 每分钟令牌数

undefined
undefined

Step 2: Built-in Retry Configuration

步骤2:内置重试配置

python
from langchain_openai import ChatOpenAI
python
from langchain_openai import ChatOpenAI

LangChain has built-in retry with exponential backoff

LangChain内置了带指数退避的重试机制

llm = ChatOpenAI( model="gpt-4o-mini", max_retries=3, # Number of retries request_timeout=30, # Timeout per request )
undefined
llm = ChatOpenAI( model="gpt-4o-mini", max_retries=3, # 重试次数 request_timeout=30, # 单次请求超时时间 )
undefined

Step 3: Advanced Retry with Tenacity

步骤3:基于Tenacity的高级重试

python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError, APIError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry=retry_if_exception_type((RateLimitError, APIError))
)
def call_with_retry(chain, input_data):
    """Call chain with exponential backoff."""
    return chain.invoke(input_data)
python
from tenacity import (
    retry,
    stop_after_attempt,
    wait_exponential,
    retry_if_exception_type
)
from openai import RateLimitError, APIError

@retry(
    stop=stop_after_attempt(5),
    wait=wait_exponential(multiplier=1, min=4, max=60),
    retry=retry_if_exception_type((RateLimitError, APIError))
)
def call_with_retry(chain, input_data):
    """通过指数退避方式调用chain。"""
    return chain.invoke(input_data)

Usage

使用示例

result = call_with_retry(chain, {"input": "Hello"})
undefined
result = call_with_retry(chain, {"input": "Hello"})
undefined

Step 4: Rate Limiter Wrapper

步骤4:速率限制器封装

python
import asyncio
import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter for API calls."""

    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.interval = 60.0 / requests_per_minute
        self.timestamps = deque()
        self.lock = Lock()

    def acquire(self):
        """Block until request can be made."""
        with self.lock:
            now = time.time()
            # Remove timestamps older than 1 minute
            while self.timestamps and now - self.timestamps[0] > 60:
                self.timestamps.popleft()

            if len(self.timestamps) >= self.rpm:
                sleep_time = 60 - (now - self.timestamps[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)

            self.timestamps.append(time.time())
python
import asyncio
import time
from collections import deque
from threading import Lock

class RateLimiter:
    """用于API调用的令牌桶速率限制器。"""

    def __init__(self, requests_per_minute: int = 60):
        self.rpm = requests_per_minute
        self.interval = 60.0 / requests_per_minute
        self.timestamps = deque()
        self.lock = Lock()

    def acquire(self):
        """阻塞直到可以发起请求。"""
        with self.lock:
            now = time.time()
            # 移除1分钟前的时间戳
            while self.timestamps and now - self.timestamps[0] > 60:
                self.timestamps.popleft()

            if len(self.timestamps) >= self.rpm:
                sleep_time = 60 - (now - self.timestamps[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)

            self.timestamps.append(time.time())

Usage with LangChain

与LangChain结合使用

rate_limiter = RateLimiter(requests_per_minute=100)
def rate_limited_call(chain, input_data): rate_limiter.acquire() return chain.invoke(input_data)
undefined
rate_limiter = RateLimiter(requests_per_minute=100)
def rate_limited_call(chain, input_data): rate_limiter.acquire() return chain.invoke(input_data)
undefined

Step 5: Async Rate Limiting

步骤5:异步速率限制

python
import asyncio
from asyncio import Semaphore

class AsyncRateLimiter:
    """Async rate limiter with semaphore."""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)

    async def call(self, chain, input_data):
        async with self.semaphore:
            return await chain.ainvoke(input_data)
python
import asyncio
from asyncio import Semaphore

class AsyncRateLimiter:
    """基于信号量的异步速率限制器。"""

    def __init__(self, max_concurrent: int = 10):
        self.semaphore = Semaphore(max_concurrent)

    async def call(self, chain, input_data):
        async with self.semaphore:
            return await chain.ainvoke(input_data)

Batch processing with rate limiting

带速率限制的批量处理

async def process_batch(chain, inputs: list, max_concurrent: int = 5): limiter = AsyncRateLimiter(max_concurrent) tasks = [limiter.call(chain, inp) for inp in inputs] return await asyncio.gather(*tasks, return_exceptions=True)
undefined
async def process_batch(chain, inputs: list, max_concurrent: int = 5): limiter = AsyncRateLimiter(max_concurrent) tasks = [limiter.call(chain, inp) for inp in inputs] return await asyncio.gather(*tasks, return_exceptions=True)
undefined

Output

实现效果

  • Configured retry logic with exponential backoff
  • Rate limiter class for request throttling
  • Async batch processing with concurrency control
  • Graceful handling of rate limit errors
  • 配置了带指数退避的重试逻辑
  • 用于请求限流的速率限制器类
  • 带并发控制的异步批量处理
  • 优雅处理速率限制错误

Examples

示例

Handling Rate Limits in Production

生产环境中处理速率限制

python
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig

llm = ChatOpenAI(
    model="gpt-4o-mini",
    max_retries=5,
)
python
from langchain_openai import ChatOpenAI
from langchain_core.runnables import RunnableConfig

llm = ChatOpenAI(
    model="gpt-4o-mini",
    max_retries=5,
)

Use batch with max_concurrency

使用batch方法并设置最大并发数

inputs = [{"input": f"Query {i}"} for i in range(100)]
results = chain.batch( inputs, config=RunnableConfig(max_concurrency=10) # Limit concurrent calls )
undefined
inputs = [{"input": f"Query {i}"} for i in range(100)]
results = chain.batch( inputs, config=RunnableConfig(max_concurrency=10) # 限制并发调用数 )
undefined

Fallback on Rate Limit

速率限制时的降级方案

python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary = ChatOpenAI(model="gpt-4o-mini", max_retries=2)
fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022")
python
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic

primary = ChatOpenAI(model="gpt-4o-mini", max_retries=2)
fallback = ChatAnthropic(model="claude-3-5-sonnet-20241022")

Automatically switch to fallback on rate limit

触发速率限制时自动切换到降级模型

robust_llm = primary.with_fallbacks([fallback])
undefined
robust_llm = primary.with_fallbacks([fallback])
undefined

Error Handling

错误处理

ErrorCauseSolution
RateLimitErrorExceeded quotaImplement backoff, reduce concurrency
TimeoutRequest too slowIncrease timeout, check network
429 Too Many RequestsAPI throttledWait and retry with backoff
Quota ExceededMonthly limit hitUpgrade plan or switch provider
错误类型原因解决方案
RateLimitError超出配额实现退避机制,降低并发数
Timeout请求过慢增加超时时间,检查网络
429 Too Many RequestsAPI被限流等待后使用退避策略重试
Quota Exceeded月度配额耗尽升级套餐或切换提供商

Resources

参考资源

Next Steps

下一步

Proceed to
langchain-security-basics
for security best practices.
继续学习
langchain-security-basics
以了解安全最佳实践。