Loading...
Loading...
Build AI agents for real-time financial options analysis with LangGraph, ChromaDB RAG, and Polygon.io data
npx skill4agent add aradotso/data-skills options-analytics-agent-langgraphSkill by ara.so — Data Skills collection.
# Python 3.10+
python --version
# Create virtual environment
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activatepip install -r requirements.txtlangchain>=0.3.0
langgraph>=0.2.45
langchain-openai>=0.2.6
langchain-chroma>=0.1.4
chromadb>=0.5.20
fastapi>=0.115.5
uvicorn>=0.32.1
pandas>=2.2.3
matplotlib>=3.9.2
tavily-python>=0.5.0.env# Required
OPENAI_API_KEY=your_openai_api_key
POLYGON_API_KEY=your_polygon_io_api_key
# Optional
TAVILY_API_KEY=your_tavily_api_key # For web search
LANGCHAIN_API_KEY=your_langchain_api_key # For tracing
LANGCHAIN_TRACING_V2=true# Test import
from agent_main import create_agent_workflow
from config.settings import validate_api_keys
# Validate API keys
validate_api_keys()
print("✓ Installation successful")project/
├── agent_main.py # Main agent entry point
├── config/settings.py # Configuration management
├── tools/
│ ├── search/ # Options search tools
│ ├── export/ # Data export tools
│ └── analysis/ # Analysis tools
├── rag/ # RAG knowledge base
├── monitoring/ # Performance tracking
└── microservice/ # FastAPI deploymentfrom agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
# Create agent
workflow = create_agent_workflow()
app = workflow.compile()
# Simple query
config = {"configurable": {"thread_id": "session_1"}}
query = "Search for AAPL options expiring this week"
result = app.invoke(
{"messages": [HumanMessage(content=query)]},
config=config
)
print(result["messages"][-1].content)from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def chat():
workflow = create_agent_workflow()
app = workflow.compile()
session_id = "user_session_1"
print("Options Analytics Agent (type 'exit' to quit)")
while True:
user_input = input("\nYou: ").strip()
if user_input.lower() in ['exit', 'quit']:
break
config = {"configurable": {"thread_id": session_id}}
result = app.invoke(
{"messages": [HumanMessage(content=user_input)]},
config=config
)
response = result["messages"][-1].content
print(f"\nAgent: {response}")
if __name__ == "__main__":
chat()from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()
# Search with automatic caching
result = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # Use cache if available
)
# Force fresh API call
result = tool._run(
ticker="NVDA",
expiration_date="2024-12-20",
option_type="call",
force_refresh=True
)from tools.search.batch_search import BatchOptionsSearchTool
tool = BatchOptionsSearchTool()
result = tool._run(
tickers=["AAPL", "MSFT", "GOOGL"],
expiration_date="2024-12-31",
option_type="call"
)from rag.rag_tools import RAGQueryTool
rag_tool = RAGQueryTool()
# Semantic search
results = rag_tool._run(
query="high volume AAPL calls near the money",
top_k=5
)
# Date-based retrieval
from rag.rag_collection_tools import DateRangeCollectionTool
date_tool = DateRangeCollectionTool()
data = date_tool._run(
ticker="AAPL",
start_date="2024-12-01",
end_date="2024-12-31"
)from tools.analysis.analysis_tools import AnalyzeOptionsTool
analysis_tool = AnalyzeOptionsTool()
result = analysis_tool._run(
ticker="TSLA",
expiration_date="2024-12-20",
analysis_type="sentiment" # or "greeks", "anomaly"
)
print(result)from tools.export.csv_export import CSVExportTool
from tools.export.visualization import ChartVisualizationTool
# CSV export
csv_tool = CSVExportTool()
csv_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
output_filename="aapl_calls.csv"
)
# Chart generation
chart_tool = ChartVisualizationTool()
chart_tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
chart_type="volume_oi"
)# config/settings.py
from config.settings import (
OPENAI_API_KEY,
POLYGON_API_KEY,
MODEL_NAME,
CHROMA_PERSIST_DIR,
validate_api_keys
)
# Validate all keys
validate_api_keys()
# Access configuration
print(f"Model: {MODEL_NAME}")
print(f"ChromaDB: {CHROMA_PERSIST_DIR}")from agent_main import create_agent_workflow
from langgraph.checkpoint.memory import MemorySaver
# Create with custom checkpointer
memory = MemorySaver()
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=memory)
# Or use SQLite checkpointer
from langgraph.checkpoint.sqlite import SqliteSaver
with SqliteSaver.from_conn_string("checkpoints.db") as checkpointer:
app = workflow.compile(checkpointer=checkpointer)from langchain_core.tools import tool
from langgraph.prebuilt import create_react_agent
@tool
def custom_options_analyzer(ticker: str, metric: str) -> str:
"""Analyze specific option metric.
Args:
ticker: Stock symbol
metric: Metric to analyze (volatility, skew, etc.)
"""
# Your custom logic
return f"Analysis for {ticker}: {metric}"
# Add to agent
from config.settings import get_llm
llm = get_llm()
tools = [custom_options_analyzer]
agent = create_react_agent(llm, tools)from rag.rag_knowledge_base import RAGKnowledgeBase
# Initialize
kb = RAGKnowledgeBase(
persist_directory="./data/chroma_db",
collection_name="options_data"
)
# Add documents
kb.add_documents([
{
"ticker": "AAPL",
"expiration": "2024-12-20",
"strike": 180.0,
"type": "call",
"volume": 5000,
"open_interest": 10000
}
])
# Query
results = kb.query(
query_text="high volume Apple calls",
n_results=5
)from langgraph.checkpoint.sqlite import SqliteSaver
# Create persistent checkpointer
checkpointer = SqliteSaver.from_conn_string("./data/conversation_memory.db")
workflow = create_agent_workflow()
app = workflow.compile(checkpointer=checkpointer)
# Session 1
config1 = {"configurable": {"thread_id": "user_123"}}
app.invoke({"messages": [HumanMessage("Search AAPL options")]}, config1)
# Session 2 (remembers previous context)
app.invoke({"messages": [HumanMessage("Show me the calls")]}, config1)from langchain_core.messages import HumanMessage
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "session_1"}}
query = HumanMessage(content="Analyze TSLA options")
# Stream tokens
for chunk in app.stream({"messages": [query]}, config):
if "messages" in chunk:
print(chunk["messages"][-1].content, end="", flush=True)# microservice/app.py
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
from agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
app = FastAPI(title="Options Analytics API")
workflow = create_agent_workflow()
agent_app = workflow.compile()
class QueryRequest(BaseModel):
query: str
session_id: str = "default"
@app.post("/query")
async def query_agent(request: QueryRequest):
try:
config = {"configurable": {"thread_id": request.session_id}}
result = agent_app.invoke(
{"messages": [HumanMessage(content=request.query)]},
config=config
)
return {
"response": result["messages"][-1].content,
"session_id": request.session_id
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
# Run: uvicorn microservice.app:app --reload# microservice/Dockerfile
FROM python:3.10-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["uvicorn", "microservice.app:app", "--host", "0.0.0.0", "--port", "8000"]# docker-compose.yml
version: '3.8'
services:
options-agent:
build: .
ports:
- "8000:8000"
environment:
- OPENAI_API_KEY=${OPENAI_API_KEY}
- POLYGON_API_KEY=${POLYGON_API_KEY}
volumes:
- ./data:/app/data
- ./outputs:/app/outputsdocker-compose up -dfrom agent_main import create_agent_workflow
from langchain_core.messages import HumanMessage
def analyze_options_workflow(ticker: str, expiration: str):
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": f"analysis_{ticker}"}}
# Step 1: Search options
query1 = f"Search {ticker} options expiring {expiration}"
result1 = app.invoke({"messages": [HumanMessage(query1)]}, config)
# Step 2: Analyze sentiment
query2 = f"Analyze sentiment for {ticker} options"
result2 = app.invoke({"messages": [HumanMessage(query2)]}, config)
# Step 3: Export to CSV
query3 = f"Export {ticker} options to CSV"
result3 = app.invoke({"messages": [HumanMessage(query3)]}, config)
return {
"search": result1["messages"][-1].content,
"analysis": result2["messages"][-1].content,
"export": result3["messages"][-1].content
}
# Run pipeline
results = analyze_options_workflow("NVDA", "2024-12-31")from tools.search.batch_search import BatchOptionsSearchTool
from tools.analysis.analysis_tools import AnalyzeOptionsTool
def batch_analysis(tickers: list, expiration: str):
search_tool = BatchOptionsSearchTool()
analysis_tool = AnalyzeOptionsTool()
results = {}
# Batch search
search_result = search_tool._run(
tickers=tickers,
expiration_date=expiration,
option_type="call"
)
# Individual analysis
for ticker in tickers:
analysis = analysis_tool._run(
ticker=ticker,
expiration_date=expiration,
analysis_type="sentiment"
)
results[ticker] = analysis
return results
# Process watchlist
watchlist = ["AAPL", "MSFT", "GOOGL", "NVDA", "TSLA"]
results = batch_analysis(watchlist, "2024-12-31")from langchain.callbacks.base import BaseCallbackHandler
from langchain_core.messages import HumanMessage
class OptionsMonitorCallback(BaseCallbackHandler):
def on_tool_start(self, serialized, input_str, **kwargs):
print(f"🔧 Tool: {serialized['name']}")
def on_tool_end(self, output, **kwargs):
print(f"✅ Result: {output[:100]}...")
# Use callback
workflow = create_agent_workflow()
app = workflow.compile()
config = {
"configurable": {"thread_id": "monitor_session"},
"callbacks": [OptionsMonitorCallback()]
}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)# Validate API keys
from config.settings import validate_api_keys
try:
validate_api_keys()
print("✓ All API keys valid")
except ValueError as e:
print(f"✗ Missing: {e}")
# Set missing keys in .env file# Reset ChromaDB
import shutil
import os
chroma_path = "./data/chroma_db"
if os.path.exists(chroma_path):
shutil.rmtree(chroma_path)
print("ChromaDB reset")
# Reinitialize
from rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()# Clear SQLite memory
import os
memory_file = "./data/conversation_memory.db"
if os.path.exists(memory_file):
os.remove(memory_file)
print("Conversation memory cleared")# Use caching to reduce API calls
from tools.search.options_search import OptionsSearchTool
tool = OptionsSearchTool()
# Always try cache first
result = tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call",
force_refresh=False # Use cached data
)
# Only force refresh when absolutely necessary# Inspect agent state
from agent_main import create_agent_workflow
workflow = create_agent_workflow()
app = workflow.compile()
config = {"configurable": {"thread_id": "debug_session"}}
result = app.invoke(
{"messages": [HumanMessage("Search AAPL options")]},
config=config
)
# Print full state
print("Messages:", result["messages"])
print("Tools called:", [m.additional_kwargs for m in result["messages"]])# Set in .env
# LANGCHAIN_TRACING_V2=true
# LANGCHAIN_API_KEY=your_key
# Or in code
import os
os.environ["LANGCHAIN_TRACING_V2"] = "true"
os.environ["LANGCHAIN_API_KEY"] = "your_key"
# View traces at https://smith.langchain.comfrom rag.rag_knowledge_base import RAGKnowledgeBase
kb = RAGKnowledgeBase()
# Batch add for efficiency
documents = [
{"ticker": "AAPL", "data": "..."},
{"ticker": "MSFT", "data": "..."},
# ... more documents
]
kb.add_documents(documents) # Processes in batchesfrom concurrent.futures import ThreadPoolExecutor
from tools.search.options_search import OptionsSearchTool
def search_ticker(ticker, date):
tool = OptionsSearchTool()
return tool._run(ticker=ticker, expiration_date=date, option_type="call")
tickers = ["AAPL", "MSFT", "GOOGL", "NVDA"]
date = "2024-12-31"
with ThreadPoolExecutor(max_workers=4) as executor:
results = list(executor.map(lambda t: search_ticker(t, date), tickers))import unittest
from tools.search.options_search import OptionsSearchTool
class TestOptionsSearch(unittest.TestCase):
def setUp(self):
self.tool = OptionsSearchTool()
def test_search_call_options(self):
result = self.tool._run(
ticker="AAPL",
expiration_date="2024-12-20",
option_type="call"
)
self.assertIn("AAPL", result)
self.assertIn("call", result.lower())
if __name__ == "__main__":
unittest.main()