live-feed

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese
Create a real-time indicator feed using OpenAlgo WebSocket streaming.
使用OpenAlgo WebSocket流创建实时指标数据源。

Arguments

参数说明

Parse
$ARGUMENTS
as: symbol exchange mode
  • $0
    = symbol (e.g., SBIN, RELIANCE, NIFTY). Default: SBIN
  • $1
    = exchange (e.g., NSE, NSE_INDEX). Default: NSE
  • $2
    = mode (e.g., ltp, quote, depth, multi). Default: quote
If no arguments, ask user for symbol and what data they want.
$ARGUMENTS
解析为:标的代码 交易所 模式
  • $0
    = 标的代码(例如:SBIN、RELIANCE、NIFTY)。默认值:SBIN
  • $1
    = 交易所(例如:NSE、NSE_INDEX)。默认值:NSE
  • $2
    = 模式(例如:ltp、quote、depth、multi)。默认值:quote
如果未提供参数,请询问用户标的代码以及所需的数据类型。

Instructions

操作步骤

  1. Read the indicator-expert rules, especially:
    • rules/websocket-feeds.md
      — WebSocket connection and subscription
    • rules/data-fetching.md
      — Historical data for buffer initialization
  2. Create
    charts/live/
    directory (on-demand)
  3. Create
    {symbol}_live_feed.py
  4. Use the template from
    rules/assets/live_feed/template.py
  1. 阅读指标专家规则,尤其是:
    • rules/websocket-feeds.md
      — WebSocket连接与订阅规则
    • rules/data-fetching.md
      — 用于缓冲区初始化的历史数据获取规则
  2. 按需创建
    charts/live/
    目录
  3. 创建
    {symbol}_live_feed.py
    文件
  4. 使用
    rules/assets/live_feed/template.py
    中的模板

Feed Types

数据源类型

ltp
— Last Traded Price + Indicators

ltp
— 最新成交价(LTP)+ 指标

  • Subscribe to LTP feed
  • Maintain rolling buffer (last 200 ticks)
  • Compute EMA, RSI on buffer
  • Print real-time indicator values
  • 订阅LTP数据流
  • 维护滚动缓冲区(最近200条交易数据)
  • 在缓冲区上计算EMA、RSI指标
  • 打印实时指标值

quote
— Full Quote + Indicators

quote
— 完整报价 + 指标

  • Subscribe to Quote feed
  • Display OHLC + LTP + Volume
  • Compute indicators on close buffer
  • Color-coded output (bullish/bearish)
  • 订阅报价数据流
  • 展示OHLC(开盘价/最高价/最低价/收盘价)+ LTP + 成交量
  • 基于收盘价缓冲区计算指标
  • 输出带颜色区分的结果(看涨/看跌)

depth
— Market Depth Analysis

depth
— 市场深度分析

  • Subscribe to Depth feed
  • Display L5 bid/ask book
  • Compute bid-ask spread, order imbalance
  • Show total buy vs sell quantity
  • 订阅深度数据流
  • 展示L5买卖盘口
  • 计算买卖价差、委托单不平衡度
  • 显示总买量与总卖量对比

multi
— Multi-Symbol Feed

multi
— 多标的数据源

  • Subscribe to multiple symbols
  • Display watchlist table with LTP and key indicator
  • Auto-refresh display
  • 订阅多个标的的数据
  • 展示包含LTP及关键指标的观察列表表格
  • 自动刷新显示

Script Structure

脚本结构

python
"""
Real-Time Indicator Feed for {SYMBOL}
Mode: {mode}
"""
import os
import time
import numpy as np
from datetime import datetime, timedelta
from dotenv import find_dotenv, load_dotenv
from openalgo import api, ta

load_dotenv(find_dotenv(), override=False)

SYMBOL = "{symbol}"
EXCHANGE = "{exchange}"

client = api(
    api_key=os.getenv("OPENALGO_API_KEY"),
    host=os.getenv("OPENALGO_HOST", "http://127.0.0.1:5000"),
    verbose=1,
)
python
"""
Real-Time Indicator Feed for {SYMBOL}
Mode: {mode}
"""
import os
import time
import numpy as np
from datetime import datetime, timedelta
from dotenv import find_dotenv, load_dotenv
from openalgo import api, ta

load_dotenv(find_dotenv(), override=False)

SYMBOL = "{symbol}"
EXCHANGE = "{exchange}"

client = api(
    api_key=os.getenv("OPENALGO_API_KEY"),
    host=os.getenv("OPENALGO_HOST", "http://127.0.0.1:5000"),
    verbose=1,
)

Pre-fetch historical data for buffer initialization

Pre-fetch historical data for buffer initialization

df = client.history( symbol=SYMBOL, exchange=EXCHANGE, interval="1m", start_date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"), end_date=datetime.now().strftime("%Y-%m-%d"), ) close_buffer = list(df["close"].values[-200:])
instruments = [{"exchange": EXCHANGE, "symbol": SYMBOL}]
def on_data(data): ltp = data["data"].get("ltp") if ltp is None: return
close_buffer.append(float(ltp))
if len(close_buffer) > 200:
    close_buffer.pop(0)

if len(close_buffer) >= 20:
    arr = np.array(close_buffer, dtype=np.float64)
    ema_val = ta.ema(arr, 20)[-1]
    rsi_val = ta.rsi(arr, 14)[-1] if len(arr) >= 15 else float("nan")

    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] {SYMBOL} LTP:{ltp:>10.2f} | "
          f"EMA(20):{ema_val:>10.2f} | RSI(14):{rsi_val:>6.2f}")
df = client.history( symbol=SYMBOL, exchange=EXCHANGE, interval="1m", start_date=(datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d"), end_date=datetime.now().strftime("%Y-%m-%d"), ) close_buffer = list(df["close"].values[-200:])
instruments = [{"exchange": EXCHANGE, "symbol": SYMBOL}]
def on_data(data): ltp = data["data"].get("ltp") if ltp is None: return
close_buffer.append(float(ltp))
if len(close_buffer) > 200:
    close_buffer.pop(0)

if len(close_buffer) >= 20:
    arr = np.array(close_buffer, dtype=np.float64)
    ema_val = ta.ema(arr, 20)[-1]
    rsi_val = ta.rsi(arr, 14)[-1] if len(arr) >= 15 else float("nan")

    timestamp = datetime.now().strftime("%H:%M:%S")
    print(f"[{timestamp}] {SYMBOL} LTP:{ltp:>10.2f} | "
          f"EMA(20):{ema_val:>10.2f} | RSI(14):{rsi_val:>6.2f}")

Connect and subscribe

Connect and subscribe

client.connect() client.subscribe_ltp(instruments, on_data_received=on_data)
print(f"Streaming {SYMBOL} on {EXCHANGE} — Press Ctrl+C to stop") try: while True: time.sleep(1) except KeyboardInterrupt: print("Stopping feed...")
client.unsubscribe_ltp(instruments) client.disconnect()
undefined
client.connect() client.subscribe_ltp(instruments, on_data_received=on_data)
print(f"Streaming {SYMBOL} on {EXCHANGE} — Press Ctrl+C to stop") try: while True: time.sleep(1) except KeyboardInterrupt: print("Stopping feed...")
client.unsubscribe_ltp(instruments) client.disconnect()
undefined

Cleanup

清理操作

The script must:
  • Handle Ctrl+C gracefully
  • Unsubscribe from all feeds
  • Disconnect WebSocket
  • Print summary of session duration and bars processed
脚本必须:
  • 优雅处理Ctrl+C中断
  • 取消所有数据源订阅
  • 断开WebSocket连接
  • 打印会话时长及处理数据条数的汇总信息

Verbose Levels

日志详细级别

Inform user about verbose options:
  • verbose=0
    : Silent mode (errors only)
  • verbose=1
    : Connection and subscription logs
  • verbose=2
    : All data updates (debug mode)
告知用户日志详细级别选项:
  • verbose=0
    :静默模式(仅显示错误)
  • verbose=1
    :连接与订阅日志
  • verbose=2
    :所有数据更新(调试模式)

Example Usage

使用示例

/live-feed SBIN NSE ltp
/live-feed NIFTY NSE_INDEX quote
/live-feed SBIN NSE depth
/live-feed multi NSE
/live-feed SBIN NSE ltp
/live-feed NIFTY NSE_INDEX quote
/live-feed SBIN NSE depth
/live-feed multi NSE