live-feed
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseCreate a real-time indicator feed using OpenAlgo WebSocket streaming.
使用OpenAlgo WebSocket流创建实时指标数据源。
Arguments
参数说明
Parse as: symbol exchange mode
$ARGUMENTS- = symbol (e.g., SBIN, RELIANCE, NIFTY). Default: SBIN
$0 - = exchange (e.g., NSE, NSE_INDEX). Default: NSE
$1 - = mode (e.g., ltp, quote, depth, multi). Default: quote
$2
If no arguments, ask user for symbol and what data they want.
将解析为:标的代码 交易所 模式
$ARGUMENTS- = 标的代码(例如:SBIN、RELIANCE、NIFTY)。默认值:SBIN
$0 - = 交易所(例如:NSE、NSE_INDEX)。默认值:NSE
$1 - = 模式(例如:ltp、quote、depth、multi)。默认值:quote
$2
如果未提供参数,请询问用户标的代码以及所需的数据类型。
Instructions
操作步骤
- Read the indicator-expert rules, especially:
- — WebSocket connection and subscription
rules/websocket-feeds.md - — Historical data for buffer initialization
rules/data-fetching.md
- Create directory (on-demand)
charts/live/ - Create
{symbol}_live_feed.py - Use the template from
rules/assets/live_feed/template.py
- 阅读指标专家规则,尤其是:
- — WebSocket连接与订阅规则
rules/websocket-feeds.md - — 用于缓冲区初始化的历史数据获取规则
rules/data-fetching.md
- 按需创建目录
charts/live/ - 创建文件
{symbol}_live_feed.py - 使用中的模板
rules/assets/live_feed/template.py
Feed Types
数据源类型
ltp
— Last Traded Price + Indicators
ltpltp
— 最新成交价(LTP)+ 指标
ltp- Subscribe to LTP feed
- Maintain rolling buffer (last 200 ticks)
- Compute EMA, RSI on buffer
- Print real-time indicator values
- 订阅LTP数据流
- 维护滚动缓冲区(最近200条交易数据)
- 在缓冲区上计算EMA、RSI指标
- 打印实时指标值
quote
— Full Quote + Indicators
quotequote
— 完整报价 + 指标
quote- Subscribe to Quote feed
- Display OHLC + LTP + Volume
- Compute indicators on close buffer
- Color-coded output (bullish/bearish)
- 订阅报价数据流
- 展示OHLC(开盘价/最高价/最低价/收盘价)+ LTP + 成交量
- 基于收盘价缓冲区计算指标
- 输出带颜色区分的结果(看涨/看跌)
depth
— Market Depth Analysis
depthdepth
— 市场深度分析
depth- Subscribe to Depth feed
- Display L5 bid/ask book
- Compute bid-ask spread, order imbalance
- Show total buy vs sell quantity
- 订阅深度数据流
- 展示L5买卖盘口
- 计算买卖价差、委托单不平衡度
- 显示总买量与总卖量对比
multi
— Multi-Symbol Feed
multimulti
— 多标的数据源
multi- Subscribe to multiple symbols
- Display watchlist table with LTP and key indicator
- Auto-refresh display
- 订阅多个标的的数据
- 展示包含LTP及关键指标的观察列表表格
- 自动刷新显示
Script Structure
脚本结构
python
"""
Real-Time Indicator Feed for {SYMBOL}
Mode: {mode}
"""
import os
import time
import numpy as np
from datetime import datetime, timedelta
from dotenv import find_dotenv, load_dotenv
from openalgo import api, ta
load_dotenv(find_dotenv(), override=False)
SYMBOL = "{symbol}"
EXCHANGE = "{exchange}"
client = api(
api_key=os.getenv("OPENALGO_API_KEY"),
host=os.getenv("OPENALGO_HOST", "http://127.0.0.1:5000"),
verbose=1,
)python
"""
Real-Time Indicator Feed for {SYMBOL}
Mode: {mode}
"""
import os
import time
import numpy as np
from datetime import datetime, timedelta
from dotenv import find_dotenv, load_dotenv
from openalgo import api, ta
load_dotenv(find_dotenv(), override=False)
SYMBOL = "{symbol}"
EXCHANGE = "{exchange}"
client = api(
api_key=os.getenv("OPENALGO_API_KEY"),
host=os.getenv("OPENALGO_HOST", "http://127.0.0.1:5000"),
verbose=1,
)Pre-fetch historical data for buffer initialization
Pre-fetch historical data for buffer initialization
df = client.history(
symbol=SYMBOL, exchange=EXCHANGE, interval="1m",
start_date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
end_date=datetime.now().strftime("%Y-%m-%d"),
)
close_buffer = list(df["close"].values[-200:])
instruments = [{"exchange": EXCHANGE, "symbol": SYMBOL}]
def on_data(data):
ltp = data["data"].get("ltp")
if ltp is None:
return
close_buffer.append(float(ltp))
if len(close_buffer) > 200:
close_buffer.pop(0)
if len(close_buffer) >= 20:
arr = np.array(close_buffer, dtype=np.float64)
ema_val = ta.ema(arr, 20)[-1]
rsi_val = ta.rsi(arr, 14)[-1] if len(arr) >= 15 else float("nan")
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {SYMBOL} LTP:{ltp:>10.2f} | "
f"EMA(20):{ema_val:>10.2f} | RSI(14):{rsi_val:>6.2f}")df = client.history(
symbol=SYMBOL, exchange=EXCHANGE, interval="1m",
start_date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"),
end_date=datetime.now().strftime("%Y-%m-%d"),
)
close_buffer = list(df["close"].values[-200:])
instruments = [{"exchange": EXCHANGE, "symbol": SYMBOL}]
def on_data(data):
ltp = data["data"].get("ltp")
if ltp is None:
return
close_buffer.append(float(ltp))
if len(close_buffer) > 200:
close_buffer.pop(0)
if len(close_buffer) >= 20:
arr = np.array(close_buffer, dtype=np.float64)
ema_val = ta.ema(arr, 20)[-1]
rsi_val = ta.rsi(arr, 14)[-1] if len(arr) >= 15 else float("nan")
timestamp = datetime.now().strftime("%H:%M:%S")
print(f"[{timestamp}] {SYMBOL} LTP:{ltp:>10.2f} | "
f"EMA(20):{ema_val:>10.2f} | RSI(14):{rsi_val:>6.2f}")Connect and subscribe
Connect and subscribe
client.connect()
client.subscribe_ltp(instruments, on_data_received=on_data)
print(f"Streaming {SYMBOL} on {EXCHANGE} — Press Ctrl+C to stop")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping feed...")
client.unsubscribe_ltp(instruments)
client.disconnect()
undefinedclient.connect()
client.subscribe_ltp(instruments, on_data_received=on_data)
print(f"Streaming {SYMBOL} on {EXCHANGE} — Press Ctrl+C to stop")
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("Stopping feed...")
client.unsubscribe_ltp(instruments)
client.disconnect()
undefinedCleanup
清理操作
The script must:
- Handle Ctrl+C gracefully
- Unsubscribe from all feeds
- Disconnect WebSocket
- Print summary of session duration and bars processed
脚本必须:
- 优雅处理Ctrl+C中断
- 取消所有数据源订阅
- 断开WebSocket连接
- 打印会话时长及处理数据条数的汇总信息
Verbose Levels
日志详细级别
Inform user about verbose options:
- : Silent mode (errors only)
verbose=0 - : Connection and subscription logs
verbose=1 - : All data updates (debug mode)
verbose=2
告知用户日志详细级别选项:
- :静默模式(仅显示错误)
verbose=0 - :连接与订阅日志
verbose=1 - :所有数据更新(调试模式)
verbose=2
Example Usage
使用示例
/live-feed SBIN NSE ltp/live-feed NIFTY NSE_INDEX quote/live-feed SBIN NSE depth/live-feed multi NSE/live-feed SBIN NSE ltp/live-feed NIFTY NSE_INDEX quote/live-feed SBIN NSE depth/live-feed multi NSE