options-analytics-agent-langgraph
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseOptions Analytics Agent with LangGraph
基于LangGraph的期权分析Agent
Skill by ara.so — Data Skills collection.
A sophisticated LangGraph-based agent that automates financial options analysis with real-time data from Polygon.io, smart caching via ChromaDB, persistent memory, and professional-grade analysis. Built for creating intelligent trading assistants with RAG capabilities and microservice architecture.
由ara.so提供的技能 — 数据技能合集。
这是一个基于LangGraph的高级Agent,可利用Polygon.io的实时数据、ChromaDB智能缓存、持久化内存和专业级分析能力,实现金融期权分析自动化。专为具备RAG能力和微服务架构的智能交易助手打造。
What It Does
功能介绍
This project provides a complete AI agent system for:
- Real-time options data retrieval from Polygon.io with intelligent caching
- RAG-powered knowledge base using ChromaDB for semantic search
- Persistent conversation memory across sessions via SQLite
- Professional options analysis with Greeks, sentiment, and anomaly detection
- Multi-format exports (CSV, charts, reports)
- LangGraph orchestration for multi-agent workflows
- FastAPI microservice deployment
本项目提供一套完整的AI Agent系统,支持:
- 从Polygon.io实时获取期权数据,并具备智能缓存机制
- 基于ChromaDB的RAG驱动知识库,支持语义搜索
- 通过SQLite实现跨会话持久化对话记忆
- 包含希腊字母、情绪分析和异常检测的专业期权分析
- 多格式导出(CSV、图表、报告)
- 基于LangGraph的多Agent工作流编排
- FastAPI微服务部署
Installation
安装步骤
Prerequisites
前置条件
bash
undefinedbash
undefinedPython 3.10+
Python 3.10+
python --version
python --version
Create virtual environment
创建虚拟环境
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
undefinedpython -m venv venv
source venv/bin/activate # Windows系统使用:venv\Scripts\activate
undefinedInstall Dependencies
安装依赖
bash
pip install -r requirements.txtKey dependencies:
langchain>=0.3.0
langgraph>=0.2.45
langchain-openai>=0.2.6
langchain-chroma>=0.1.4
chromadb>=0.5.20
fastapi>=0.115.5
uvicorn>=0.32.1
pandas>=2.2.3
matplotlib>=3.9.2
tavily-python>=0.5.0bash
pip install -r requirements.txt核心依赖:
langchain>=0.3.0
langgraph>=0.2.45
langchain-openai>=0.2.6
langchain-chroma>=0.1.4
chromadb>=0.5.20
fastapi>=0.115.5
uvicorn>=0.32.1
pandas>=2.2.3
matplotlib>=3.9.2
tavily-python>=0.5.0Environment Configuration
环境配置
Create file in project root:
.envbash
undefined在项目根目录创建文件:
.envbash
undefinedRequired
必填项
OPENAI_API_KEY=your_openai_api_key
POLYGON_API_KEY=your_polygon_io_api_key
OPENAI_API_KEY=your_openai_api_key
POLYGON_API_KEY=your_polygon_io_api_key
Optional
可选项
TAVILY_API_KEY=your_tavily_api_key # For web search
LANGCHAIN_API_KEY=your_langchain_api_key # For tracing
LANGCHAIN_TRACING_V2=true
undefinedTAVILY_API_KEY=your_tavily_api_key # 用于网页搜索
LANGCHAIN_API_KEY=your_langchain_api_key # 用于追踪
LANGCHAIN_TRACING_V2=true
undefinedVerify Installation
验证安装
python
undefinedpython
undefinedTest import
测试导入
from agent_main import create_agent_workflow
from config.settings import validate_api_keys
from agent_main import create_agent_workflow
from config.settings import validate_api_keys
Validate API keys
验证API密钥
validate_api_keys()
print("✓ Installation successful")
undefinedvalidate_api_keys()
print("✓ 安装成功")
undefinedProject Structure
项目结构
project/
├── agent_main.py # Main agent entry point
├── config/settings.py # Configuration management
├── tools/
│ ├── search/ # Options search tools
│ ├── export/ # Data export tools
│ └── analysis/ # Analysis tools
├── rag/ # RAG knowledge base
├── monitoring/ # Performance tracking
└── microservice/ # FastAPI deploymentproject/
├── agent_main.py # Agent主入口
├── config/settings.py # 配置管理
├── tools/
│ ├── search/ # 期权搜索工具
│ ├── export/ # 数据导出工具
│ └── analysis/ # 分析工具
├── rag/ # RAG知识库
├── monitoring/ # 性能追踪
└── microservice/ # FastAPI部署Core Usage
核心使用方法
Basic Agent Interaction
基础Agent交互
python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessagepython
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessageCreate agent
创建Agent
workflow = create_agent_workflow()
app = workflow.compile()
workflow = create_agent_workflow()
app = workflow.compile()
Simple query
简单查询
config = {"configurable": {"thread_id": "session_1"}}
query = "Search for AAPL options expiring this week"
result = app.invoke(
{"messages": [HumanMessage(content=query)]},
config=config
)
print(result["messages"][-1].content)
undefinedconfig = {"configurable": {"thread_id": "session_1"}}
query = "Search for AAPL options expiring this week"
result = app.invoke(
{"messages": [HumanMessage(content=query)]},
config=config
)
print(result["messages"][-1].content)
undefinedInteractive Chat Loop
交互式聊天循环
python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def chat():
workflow = create_agent_workflow()
app = workflow.compile()
session_id = "user_session_1"
print("Options Analytics Agent (type 'exit' to quit)")
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['exit', 'quit']:
break
config = {"configurable": {"thread_id": session_id}}
result = app.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config
)
response = result["messages"][-1].content
print(f"\nAgent: {response}")
if __name__ == "__main__":
chat()python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def chat():
workflow = create_agent_workflow()
app = workflow.compile()
session_id = "user_session_1"
print("期权分析Agent(输入'exit'退出)")
while True:
user_input = input("\n你: ").strip()
if user_input.lower() in ['exit', 'quit']:
break
config = {"configurable": {"thread_id": session_id}}
result = app.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config
)
response = result["messages"][-1].content
print(f"\nAgent: {response}")
if __name__ == "__main__":
chat()Key Tools & Commands
核心工具与命令
1. Options Search Tool
1. 期权搜索工具
Search for options data with automatic caching:
python
from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()自动缓存的期权数据搜索:
python
from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()Search with automatic caching
使用自动缓存搜索
result = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # Use cache if available
)
result = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # 优先使用缓存
)
Force fresh API call
强制调用API获取最新数据
result = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=True
)
undefinedresult = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=True
)
undefined2. Batch Search Tool
2. 批量搜索工具
Search multiple tickers efficiently:
python
from tools.search.batch_search import BatchOptionsSearchTool
tool = BatchOptionsSearchTool()
result = tool._run(
tickers=["AAPL", "MSFT", "GOOGL"],
expiration_date="2024-12-31",
option_type="call"
)高效搜索多个标的:
python
from tools.search.batch_search import BatchOptionsSearchTool
tool = BatchOptionsSearchTool()
result = tool._run(
tickers=["AAPL", "MSFT", "GOOGL"],
expiration_date="2024-12-31",
option_type="call"
)3. RAG Knowledge Base
3. RAG知识库
Query cached options data semantically:
python
from rag.rag_tools import RAGQueryTool
rag_tool = RAGQueryTool()语义查询缓存的期权数据:
python
from rag.rag_tools import RAGQueryTool
rag_tool = RAGQueryTool()Semantic search
语义搜索
results = rag_tool._run(
query="high volume AAPL calls near the money",
top_k=5
)
results = rag_tool._run(
query="high volume AAPL calls near the money",
top_k=5
)
Date-based retrieval
按日期范围检索
from rag.rag_collection_tools import DateRangeCollectionTool
date_tool = DateRangeCollectionTool()
data = date_tool._run(
ticker="AAPL",
start_date="2024-12-01",
end_date="2024-12-31"
)
undefinedfrom rag.rag_collection_tools import DateRangeCollectionTool
date_tool = DateRangeCollectionTool()
data = date_tool._run(
ticker="AAPL",
start_date="2024-12-01",
end_date="2024-12-31"
)
undefined4. Options Analysis
4. 期权分析
Professional-grade analysis:
python
from tools.analysis.analysis_tools import AnalyzeOptionsTool
analysis_tool = AnalyzeOptionsTool()
result = analysis_tool._run(
ticker="TSLA",
expiration_date="2024-12-20",
analysis_type="sentiment" # or "greeks", "anomaly"
)
print(result)专业级分析:
python
from tools.analysis.analysis_tools import AnalyzeOptionsTool
analysis_tool = AnalyzeOptionsTool()
result = analysis_tool._run(
ticker="TSLA",
expiration_date="2024-12-20",
analysis_type="sentiment" # 可选值:"greeks"、"anomaly"
)
print(result)5. Data Export
5. 数据导出
Export to CSV or charts:
python
from tools.export.csv_export import CSVExportTool
from tools.export.visualization import ChartVisualizationTool导出为CSV或生成图表:
python
from tools.export.csv_export import CSVExportTool
from tools.export.visualization import ChartVisualizationToolCSV export
CSV导出
csv_tool = CSVExportTool()
csv_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
output_filename="aapl_calls.csv"
)
csv_tool = CSVExportTool()
csv_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
output_filename="aapl_calls.csv"
)
Chart generation
生成图表
chart_tool = ChartVisualizationTool()
chart_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
chart_type="volume_oi"
)
undefinedchart_tool = ChartVisualizationTool()
chart_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
chart_type="volume_oi"
)
undefinedConfiguration
配置说明
Settings Management
设置管理
python
undefinedpython
undefinedconfig/settings.py
config/settings.py
from config.settings import (
OPENAI_API_KEY,
POLYGON_API_KEY,
MODEL_NAME,
CHROMA_PERSIST_DIR,
validate_api_keys
)
from config.settings import (
OPENAI_API_KEY,
POLYGON_API_KEY,
MODEL_NAME,
CHROMA_PERSIST_DIR,
validate_api_keys
)
Validate all keys
验证所有密钥
validate_api_keys()
validate_api_keys()
Access configuration
访问配置
print(f"Model: {MODEL_NAME}")
print(f"ChromaDB: {CHROMA_PERSIST_DIR}")
undefinedprint(f"模型: {MODEL_NAME}")
print(f"ChromaDB存储路径: {CHROMA_PERSIST_DIR}")
undefinedCustom Agent Configuration
自定义Agent配置
python
from agent_main import create_agent_workflow
from langgraph.checkpoint.memory import MemorySaverpython
from agent_main import create_agent_workflow
from langgraph.checkpoint.memory import MemorySaverCreate with custom checkpointer
使用自定义检查点存储器
memory = MemorySaver()
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=memory)
memory = MemorySaver()
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=memory)
Or use SQLite checkpointer
或使用SQLite检查点存储器
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
undefinedfrom langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)
undefinedAdvanced Patterns
高级模式
Custom Tool Integration
自定义工具集成
python
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
@tool
def custom_options_analyzer(ticker: str, metric: str) -> str:
"""Analyze specific option metric.
Args:
ticker: Stock symbol
metric: Metric to analyze (volatility, skew, etc.)
"""
# Your custom logic
return f"Analysis for {ticker}: {metric}"python
from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
@tool
def custom_options_analyzer(ticker: str, metric: str) -> str:
"""分析特定期权指标。
参数:
ticker: 股票代码
metric: 要分析的指标(波动率、偏度等)
"""
# 自定义逻辑
return f"{ticker}的{metric}分析结果"Add to agent
添加到Agent
from config.settings import get_llm
llm = get_llm()
tools = [custom_options_analyzer]
agent = create_react_agent(llm, tools)
undefinedfrom config.settings import get_llm
llm = get_llm()
tools = [custom_options_analyzer]
agent = create_react_agent(llm, tools)
undefinedRAG Knowledge Base Setup
RAG知识库设置
python
from rag.rag_knowledge_base import RAGKnowledgeBasepython
from rag.rag_knowledge_base import RAGKnowledgeBaseInitialize
初始化
kb = RAGKnowledgeBase(
persist_directory="./data/chroma_db",
collection_name="options_data"
)
kb = RAGKnowledgeBase(
persist_directory="./data/chroma_db",
collection_name="options_data"
)
Add documents
添加文档
kb.add_documents([
{
"ticker": "AAPL",
"expiration": "2024-12-20",
"strike": 180.0,
"type": "call",
"volume": 5000,
"open_interest": 10000
}
])
kb.add_documents([
{
"ticker": "AAPL",
"expiration": "2024-12-20",
"strike": 180.0,
"type": "call",
"volume": 5000,
"open_interest": 10000
}
])
Query
查询
results = kb.query(
query_text="high volume Apple calls",
n_results=5
)
undefinedresults = kb.query(
query_text="high volume Apple calls",
n_results=5
)
undefinedPersistent Memory Across Sessions
跨会话持久化记忆
python
from langgraph.checkpoint.sqlite import SqliteSaverpython
from langgraph.checkpoint.sqlite import SqliteSaverCreate persistent checkpointer
创建持久化检查点存储器
checkpointer = SqliteSaver.from_conn_string("./data/conversation_memory.db")
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=checkpointer)
checkpointer = SqliteSaver.from_conn_string("./data/conversation_memory.db")
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=checkpointer)
Session 1
会话1
config1 = {"configurable": {"thread_id": "user_123"}}
app.invoke({"messages": [HumanMessage("Search AAPL options")]}, config1)
config1 = {"configurable": {"thread_id": "user_123"}}
app.invoke({"messages": [HumanMessage("Search AAPL options")]}, config1)
Session 2 (remembers previous context)
会话2(保留之前的上下文)
app.invoke({"messages": [HumanMessage("Show me the calls")]}, config1)
undefinedapp.invoke({"messages": [HumanMessage("Show me the calls")]}, config1)
undefinedStreaming Responses
流式响应
python
from langchain_core.messages import HumanMessage
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "session_1"}}
query = HumanMessage(content="Analyze TSLA options")python
from langchain_core.messages import HumanMessage
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "session_1"}}
query = HumanMessage(content="Analyze TSLA options")Stream tokens
流式输出令牌
for chunk in app.stream({"messages": [query]}, config):
if "messages" in chunk:
print(chunk["messages"][-1].content, end="", flush=True)
undefinedfor chunk in app.stream({"messages": [query]}, config):
if "messages" in chunk:
print(chunk["messages"][-1].content, end="", flush=True)
undefinedMicroservice Deployment
微服务部署
FastAPI Server
FastAPI服务器
python
undefinedpython
undefinedmicroservice/app.py
microservice/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
app = FastAPI(title="Options Analytics API")
workflow = create_agent_workflow()
agent_app = workflow.compile()
class QueryRequest(BaseModel):
query: str
session_id: str = "default"
@app.post("/query")
async def query_agent(request: QueryRequest):
try:
config = {"configurable": {"thread_id": request.session_id}}
result = agent_app.invoke(
{"messages": [HumanMessage(content=request.query)]},
config=config
)
return {
"response": result["messages"][-1].content,
"session_id": request.session_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
app = FastAPI(title="期权分析API")
workflow = create_agent_workflow()
agent_app = workflow.compile()
class QueryRequest(BaseModel):
query: str
session_id: str = "default"
@app.post("/query")
async def query_agent(request: QueryRequest):
try:
config = {"configurable": {"thread_id": request.session_id}}
result = agent_app.invoke(
{"messages": [HumanMessage(content=request.query)]},
config=config
)
return {
"response": result["messages"][-1].content,
"session_id": request.session_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
Run: uvicorn microservice.app:app --reload
启动命令: uvicorn microservice.app:app --reload
undefinedundefinedDocker Deployment
Docker部署
dockerfile
undefineddockerfile
undefinedmicroservice/Dockerfile
microservice/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "microservice.app:app", "--host", "0.0.0.0", "--port", "8000"]
```yamlFROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "microservice.app:app", "--host", "0.0.0.0", "--port", "8000"]
```yamldocker-compose.yml
docker-compose.yml
version: '3.8'
services:
options-agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- POLYGON_API_KEY=${POLYGON_API_KEY}
volumes:
- ./data:/app/data
- ./outputs:/app/outputs
**Start service:**
```bash
docker-compose up -dversion: '3.8'
services:
options-agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- POLYGON_API_KEY=${POLYGON_API_KEY}
volumes:
- ./data:/app/data
- ./outputs:/app/outputs
**启动服务:**
```bash
docker-compose up -dCommon Workflows
常见工作流
Complete Options Analysis Pipeline
完整期权分析流水线
python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def analyze_options_workflow(ticker: str, expiration: str):
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": f"analysis_{ticker}"}}
# Step 1: Search options
query1 = f"Search {ticker} options expiring {expiration}"
result1 = app.invoke({"messages": [HumanMessage(query1)]}, config)
# Step 2: Analyze sentiment
query2 = f"Analyze sentiment for {ticker} options"
result2 = app.invoke({"messages": [HumanMessage(query2)]}, config)
# Step 3: Export to CSV
query3 = f"Export {ticker} options to CSV"
result3 = app.invoke({"messages": [HumanMessage(query3)]}, config)
return {
"search": result1["messages"][-1].content,
"analysis": result2["messages"][-1].content,
"export": result3["messages"][-1].content
}python
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def analyze_options_workflow(ticker: str, expiration: str):
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": f"analysis_{ticker}"}}
# 步骤1:搜索期权
query1 = f"Search {ticker} options expiring {expiration}"
result1 = app.invoke({"messages": [HumanMessage(query1)]}, config)
# 步骤2:情绪分析
query2 = f"Analyze sentiment for {ticker} options"
result2 = app.invoke({"messages": [HumanMessage(query2)]}, config)
# 步骤3:导出为CSV
query3 = f"Export {ticker} options to CSV"
result3 = app.invoke({"messages": [HumanMessage(query3)]}, config)
return {
"search": result1["messages"][-1].content,
"analysis": result2["messages"][-1].content,
"export": result3["messages"][-1].content
}Run pipeline
运行流水线
results = analyze_options_workflow("NVDA", "2024-12-31")
undefinedresults = analyze_options_workflow("NVDA", "2024-12-31")
undefinedBatch Processing Multiple Tickers
多标的批量处理
python
from tools.search.batch_search import BatchOptionsSearchTool
from tools.analysis.analysis_tools import AnalyzeOptionsTool
def batch_analysis(tickers: list, expiration: str):
search_tool = BatchOptionsSearchTool()
analysis_tool = AnalyzeOptionsTool()
results = {}
# Batch search
search_result = search_tool._run(
tickers=tickers,
expiration_date=expiration,
option_type="call"
)
# Individual analysis
for ticker in tickers:
analysis = analysis_tool._run(
ticker=ticker,
expiration_date=expiration,
analysis_type="sentiment"
)
results[ticker] = analysis
return resultspython
from tools.search.batch_search import BatchOptionsSearchTool
from tools.analysis.analysis_tools import AnalyzeOptionsTool
def batch_analysis(tickers: list, expiration: str):
search_tool = BatchOptionsSearchTool()
analysis_tool = AnalyzeOptionsTool()
results = {}
# 批量搜索
search_result = search_tool._run(
tickers=tickers,
expiration_date=expiration,
option_type="call"
)
# 逐个分析
for ticker in tickers:
analysis = analysis_tool._run(
ticker=ticker,
expiration_date=expiration,
analysis_type="sentiment"
)
results[ticker] = analysis
return resultsProcess watchlist
处理关注列表
watchlist = ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]
results = batch_analysis(watchlist, "2024-12-31")
undefinedwatchlist = ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]
results = batch_analysis(watchlist, "2024-12-31")
undefinedReal-time Monitoring with Callbacks
带回调的实时监控
python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.messages import HumanMessage
class OptionsMonitorCallback(BaseCallbackHandler):
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"🔧 Tool: {serialized['name']}")
def on_tool_end(self, output, **kwargs):
print(f"✅ Result: {output[:100]}...")python
from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.messages import HumanMessage
class OptionsMonitorCallback(BaseCallbackHandler):
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"🔧 工具: {serialized['name']}")
def on_tool_end(self, output, **kwargs):
print(f"✅ 结果: {output[:100]}...")Use callback
使用回调
workflow = create_agent_workflow()
app = workflow.compile()
config = {
"configurable": {"thread_id": "monitor_session"},
"callbacks": [OptionsMonitorCallback()]
}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)
undefinedworkflow = create_agent_workflow()
app = workflow.compile()
config = {
"configurable": {"thread_id": "monitor_session"},
"callbacks": [OptionsMonitorCallback()]
}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)
undefinedTroubleshooting
故障排除
API Key Issues
API密钥问题
python
undefinedpython
undefinedValidate API keys
验证API密钥
from config.settings import validate_api_keys
try:
validate_api_keys()
print("✓ All API keys valid")
except ValueError as e:
print(f"✗ Missing: {e}")
# Set missing keys in .env file
undefinedfrom config.settings import validate_api_keys
try:
validate_api_keys()
print("✓ 所有API密钥有效")
except ValueError as e:
print(f"✗ 缺失密钥: {e}")
# 在.env文件中设置缺失的密钥
undefinedChromaDB Connection Errors
ChromaDB连接错误
python
undefinedpython
undefinedReset ChromaDB
重置ChromaDB
import shutil
import os
chroma_path = "./data/chroma_db"
if os.path.exists(chroma_path):
shutil.rmtree(chroma_path)
print("ChromaDB reset")
import shutil
import os
chroma_path = "./data/chroma_db"
if os.path.exists(chroma_path):
shutil.rmtree(chroma_path)
print("ChromaDB已重置")
Reinitialize
重新初始化
from rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()
undefinedfrom rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()
undefinedClear Conversation Memory
清除对话记忆
python
undefinedpython
undefinedClear SQLite memory
清除SQLite记忆
import os
memory_file = "./data/conversation_memory.db"
if os.path.exists(memory_file):
os.remove(memory_file)
print("Conversation memory cleared")
undefinedimport os
memory_file = "./data/conversation_memory.db"
if os.path.exists(memory_file):
os.remove(memory_file)
print("对话记忆已清除")
undefinedPolygon.io Rate Limits
Polygon.io速率限制
python
undefinedpython
undefinedUse caching to reduce API calls
使用缓存减少API调用
from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()
from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()
Always try cache first
优先使用缓存
result = tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # Use cached data
)
result = tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # 使用缓存数据
)
Only force refresh when absolutely necessary
仅在必要时强制刷新
undefinedundefinedDebug Agent State
调试Agent状态
python
undefinedpython
undefinedInspect agent state
检查Agent状态
from agent_main import create_agent_workflow
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "debug_session"}}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)
from agent_main import create_agent_workflow
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "debug_session"}}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)
Print full state
打印完整状态
print("Messages:", result["messages"])
print("Tools called:", [m.additional_kwargs for m in result["messages"]])
undefinedprint("消息:", result["messages"])
print("调用的工具:", [m.additional_kwargs for m in result["messages"]])
undefinedEnable Tracing
启用追踪
python
undefinedpython
undefinedSet in .env
在.env文件中设置
LANGCHAIN_TRACING_V2=true
LANGCHAIN_TRACING_V2=true
LANGCHAIN_API_KEY=your_key
LANGCHAIN_API_KEY=your_key
Or in code
或在代码中设置
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"
View traces at https://smith.langchain.com
undefinedundefinedPerformance Optimization
性能优化
Batch Embeddings
批量嵌入
python
from rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()python
from rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()Batch add for efficiency
批量添加提升效率
documents = [
{"ticker": "AAPL", "data": "..."},
{"ticker": "MSFT", "data": "..."},
# ... more documents
]
kb.add_documents(documents) # Processes in batches
undefineddocuments = [
{"ticker": "AAPL", "data": "..."},
{"ticker": "MSFT", "data": "..."},
# ... 更多文档
]
kb.add_documents(documents) # 批量处理
undefinedParallel Tool Execution
并行工具执行
python
from concurrent.futures import ThreadPoolExecutor
from tools.search.options_search import OptionsSearchTool
def search_ticker(ticker, date):
tool = OptionsSearchTool()
return tool._run(ticker=ticker, expiration_date=date, option_type="call")
tickers = ["AAPL", "MSFT", "GOOGL", "NVDA"]
date = "2024-12-31"
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(lambda t: search_ticker(t, date), tickers))python
from concurrent.futures import ThreadPoolExecutor
from tools.search.options_search import OptionsSearchTool
def search_ticker(ticker, date):
tool = OptionsSearchTool()
return tool._run(ticker=ticker, expiration_date=date, option_type="call")
tickers = ["AAPL", "MSFT", "GOOGL", "NVDA"]
date = "2024-12-31"
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(lambda t: search_ticker(t, date), tickers))Testing
测试
Unit Test Example
单元测试示例
python
import unittest
from tools.search.options_search import OptionsSearchTool
class TestOptionsSearch(unittest.TestCase):
def setUp(self):
self.tool = OptionsSearchTool()
def test_search_call_options(self):
result = self.tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call"
)
self.assertIn("AAPL", result)
self.assertIn("call", result.lower())
if __name__ == "__main__":
unittest.main()This skill provides comprehensive coverage of the Options Analytics Agent project, enabling AI coding agents to effectively assist developers in building sophisticated financial analysis systems with LangGraph, RAG, and persistent memory.
python
import unittest
from tools.search.options_search import OptionsSearchTool
class TestOptionsSearch(unittest.TestCase):
def setUp(self):
self.tool = OptionsSearchTool()
def test_search_call_options(self):
result = self.tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call"
)
self.assertIn("AAPL", result)
self.assertIn("call", result.lower())
if __name__ == "__main__":
unittest.main()本技能全面覆盖了期权分析Agent项目,可帮助AI编码Agent有效协助开发者构建基于LangGraph、RAG和持久化内存的复杂金融分析系统。