gateway-setup
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseGateway Setup for AI Agents on macOS
macOS 平台 AI Agent 网关搭建指南
This skill builds a persistent gateway for an AI coding agent on a Mac. It bridges background workflows (Inngest, cron, pipelines) into your agent's session and optionally routes responses to external channels (Telegram, WebSocket).
本指南将在Mac上为AI编码Agent构建持久化网关,把后台工作流(Inngest、定时任务、流水线)接入Agent会话,还可选择将响应路由到外部渠道(Telegram、WebSocket)。
Before You Start
前置准备
Required:
- macOS (Apple Silicon preferred)
- pi coding agent installed and working
- Redis running (locally, Docker, or k8s — see skill if you need this)
inngest-local
Optional (unlocks more features):
- Inngest self-hosted (for durable event-driven workflows)
- Tailscale (for secure remote access from phone/laptop)
- Telegram bot token (for mobile notifications/chat)
必需条件:
- macOS系统(推荐Apple Silicon芯片)
- pi coding agent已安装并可正常运行
- Redis正在运行(本地、Docker或k8s环境均可——若需搭建可参考技能)
inngest-local
可选条件(解锁更多功能):
- 自托管Inngest(用于持久化事件驱动工作流)
- Tailscale(实现从手机/笔记本的安全远程访问)
- Telegram机器人令牌(用于移动端通知/聊天)
Critical Setup Notes
关键设置说明
GATEWAY_ROLE=centralbash
GATEWAY_ROLE=central piserveHostlocalhost:3100typescript
inngestServe({
client: inngest,
functions,
serveHost: "http://host.docker.internal:3100",
})Then force re-sync:
curl -X PUT http://localhost:3100/api/inngestioredis resolution in Bun is flaky. If you get , install it explicitly:
Cannot find module '@ioredis/commands'bash
bun add @ioredis/commands必须设置以实现始终在线会话。 若未设置,会话将以卫星模式运行,会错过心跳、系统警报以及所有非针对性事件。启动时需配置:
GATEWAY_ROLE=centralbash
GATEWAY_ROLE=central pi当Inngest在Docker中运行且工作进程在宿主机时,必须设置。 SDK会将作为回调URL,但Docker无法访问宿主机的回环地址。需在Hono服务处理器中配置:
serveHostlocalhost:3100typescript
inngestServe({
client: inngest,
functions,
serveHost: "http://host.docker.internal:3100",
})然后强制重新同步:
curl -X PUT http://localhost:3100/api/inngestBun环境下ioredis的解析存在不稳定问题。 若出现错误,需手动安装:
Cannot find module '@ioredis/commands'bash
bun add @ioredis/commandsor: rm -rf node_modules && bun install
或执行:rm -rf node_modules && bun install
**Two ioredis clients required for pub/sub.** A subscribed client can't run LRANGE, DEL, or other commands. The extension creates separate `sub` and `cmd` clients.
**发布/订阅功能需要两个ioredis客户端。** 已订阅的客户端无法执行LRANGE、DEL等命令,扩展会创建独立的`sub`和`cmd`客户端。Intent Alignment
需求匹配
Before building anything, ask the user these questions to determine scope. Adapt based on their answers.
在开始搭建前,请先询问用户以下问题以确定搭建范围,并根据回答调整方案。
Question 1: What's your goal?
问题1:你的目标是什么?
Present these options:
- Notifications only — background jobs finish, I want to know about it without watching the terminal
- Always-on agent — I want a persistent session that survives terminal closes, handles heartbeats, routes events
- Full gateway — always-on + talk to my agent from Telegram/phone + multi-session routing
Each level builds on the previous. Start with what they need now.
提供以下选项:
- 仅通知 —— 后台任务完成时,无需盯着终端就能收到通知
- 始终在线Agent —— 我需要一个能在终端关闭后仍运行的持久化会话,支持心跳监控和事件路由
- 完整网关 —— 始终在线功能 + 可通过Telegram/手机与Agent交互 + 多会话路由
每个层级都基于前一层级构建,从用户当前需求开始即可。
Question 2: What's your event source?
问题2:你的事件源是什么?
- Just cron/timers — I want a heartbeat that checks system health periodically
- Inngest functions — I have durable workflows that emit completion events
- Mixed — Inngest + cron + maybe webhooks
- 仅定时任务/计时器 —— 我需要定期检查系统健康状态的心跳机制
- Inngest函数 —— 我有会触发完成事件的持久化工作流
- 混合模式 —— Inngest + 定时任务 + 可能包含Webhooks
Question 3: How many concurrent agent sessions?
问题3:你同时运行多少个Agent会话?
- One — I run one pi session at a time
- Multiple — I often have 2-5 sessions in different terminals working on different things
If multiple: enable central/satellite routing. If one: simpler single-session mode.
- 1个 —— 我同一时间只运行一个pi会话
- 多个 —— 我经常同时在不同终端运行2-5个会话处理不同任务
若为多个会话:启用中心/卫星路由模式;若为单个会话:使用更简单的单会话模式。
Architecture Tiers
架构层级
Tier 1: Notification Bridge (simplest)
层级1:仅通知桥接(最简配置)
What you get: Background events show up in your pi session as messages.
Components:
- Redis (already running)
- Gateway pi extension (~100 lines)
- utility function
pushGatewayEvent()
How it works:
Background process → Redis LPUSH → pi extension drains on notify → injected as user messageBuild steps:
- Create the extension directory:
bash
mkdir -p ~/.pi/agent/extensions/gateway- Create :
~/.pi/agent/extensions/gateway/package.json
json
{
"name": "gateway-extension",
"private": true,
"dependencies": {
"ioredis": "^5.4.2"
}
}- Install dependencies:
bash
cd ~/.pi/agent/extensions/gateway && npm install- Create with the minimal bridge:
~/.pi/agent/extensions/gateway/index.ts
typescript
import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent";
const SESSION_ID = "main";
const EVENT_LIST = `agent:events:${SESSION_ID}`;
const NOTIFY_CHANNEL = `agent:notify:${SESSION_ID}`;
type RedisLike = {
on(event: string, listener: (...args: unknown[]) => void): void;
connect(): Promise<void>;
subscribe(channel: string): Promise<unknown>;
lrange(key: string, start: number, stop: number): Promise<string[]>;
del(key: string): Promise<number>;
llen(key: string): Promise<number>;
unsubscribe(): void;
disconnect(): void;
};
type RedisCtor = new (options: { host: string; port: number; lazyConnect: boolean }) => RedisLike;
let Redis: RedisCtor | null = null;
let sub: RedisLike | null = null;
let cmd: RedisLike | null = null;
let ctx: ExtensionContext | null = null;
let piRef: ExtensionAPI | null = null;
interface SystemEvent {
id: string;
type: string;
source: string;
payload: Record<string, unknown>;
ts: number;
}
function formatEvents(events: SystemEvent[]): string {
return events.map((e) => {
const time = new Date(e.ts).toLocaleTimeString("en-US", { hour12: false });
return `- **[${time}] ${e.type}** (${e.source})`;
}).join("\n");
}
async function drain(): Promise<void> {
if (!cmd || !piRef) return;
const raw = await cmd.lrange(EVENT_LIST, 0, -1);
if (raw.length === 0) return;
const events = raw.reverse().map(r => {
try { return JSON.parse(r) as SystemEvent; } catch { return null; }
}).filter(Boolean) as SystemEvent[];
if (events.length === 0) { await cmd.del(EVENT_LIST); return; }
const prompt = [
`## 🔔 ${events.length} event(s) — ${new Date().toISOString()}`,
"", formatEvents(events), "",
"Take action if needed, otherwise acknowledge briefly.",
].join("\n");
if (ctx?.isIdle()) {
piRef.sendUserMessage(prompt);
} else {
piRef.sendUserMessage(prompt, { deliverAs: "followUp" });
}
await cmd.del(EVENT_LIST);
}
export default function (pi: ExtensionAPI) {
piRef = pi;
pi.on("session_start", async (_event, _ctx) => {
ctx = _ctx;
if (!Redis) {
try {
Redis = (await import("ioredis")).default as RedisCtor;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
_ctx.ui.notify(`Gateway extension running without Redis: ${message}`, "warning");
return;
}
}
sub = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
cmd = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
await sub.connect();
await cmd.connect();
await sub.subscribe(NOTIFY_CHANNEL);
sub.on("message", () => { if (ctx?.isIdle()) drain(); });
// Drain anything that accumulated while session was down
const pending = await cmd.llen(EVENT_LIST);
if (pending > 0) await drain();
ctx.ui.setStatus("gateway", "🔗 connected");
});
pi.on("agent_end", async () => { drain(); });
pi.on("session_shutdown", async () => {
if (sub) { sub.unsubscribe(); sub.disconnect(); }
if (cmd) { cmd.disconnect(); }
});
}- Push events from any script:
typescript
import Redis from "ioredis";
const redis = new Redis();
async function pushEvent(type: string, source: string, payload = {}) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
await redis.lpush("agent:events:main", JSON.stringify(event));
await redis.publish("agent:notify:main", JSON.stringify({ type }));
}
// Example: notify when a download finishes
await pushEvent("download.complete", "my-script", { file: "video.mp4" });- Restart pi — the extension loads automatically.
实现效果: 后台事件会以消息形式出现在pi会话中。
组件:
- Redis(已运行)
- Gateway pi扩展(约100行代码)
- 工具函数
pushGatewayEvent()
工作流程:
后台进程 → Redis LPUSH → pi扩展在收到通知时拉取事件 → 作为用户消息注入会话搭建步骤:
- 创建扩展目录:
bash
mkdir -p ~/.pi/agent/extensions/gateway- 创建文件:
~/.pi/agent/extensions/gateway/package.json
json
{
"name": "gateway-extension",
"private": true,
"dependencies": {
"ioredis": "^5.4.2"
}
}- 安装依赖:
bash
cd ~/.pi/agent/extensions/gateway && npm install- 创建文件,写入最简桥接代码:
~/.pi/agent/extensions/gateway/index.ts
typescript
import type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent";
const SESSION_ID = "main";
const EVENT_LIST = `agent:events:${SESSION_ID}`;
const NOTIFY_CHANNEL = `agent:notify:${SESSION_ID}`;
type RedisLike = {
on(event: string, listener: (...args: unknown[]) => void): void;
connect(): Promise<void>;
subscribe(channel: string): Promise<unknown>;
lrange(key: string, start: number, stop: number): Promise<string[]>;
del(key: string): Promise<number>;
llen(key: string): Promise<number>;
unsubscribe(): void;
disconnect(): void;
};
type RedisCtor = new (options: { host: string; port: number; lazyConnect: boolean }) => RedisLike;
let Redis: RedisCtor | null = null;
let sub: RedisLike | null = null;
let cmd: RedisLike | null = null;
let ctx: ExtensionContext | null = null;
let piRef: ExtensionAPI | null = null;
interface SystemEvent {
id: string;
type: string;
source: string;
payload: Record<string, unknown>;
ts: number;
}
function formatEvents(events: SystemEvent[]): string {
return events.map((e) => {
const time = new Date(e.ts).toLocaleTimeString("en-US", { hour12: false });
return `- **[${time}] ${e.type}** (${e.source})`;
}).join("\n");
}
async function drain(): Promise<void> {
if (!cmd || !piRef) return;
const raw = await cmd.lrange(EVENT_LIST, 0, -1);
if (raw.length === 0) return;
const events = raw.reverse().map(r => {
try { return JSON.parse(r) as SystemEvent; } catch { return null; }
}).filter(Boolean) as SystemEvent[];
if (events.length === 0) { await cmd.del(EVENT_LIST); return; }
const prompt = [
`## 🔔 ${events.length} event(s) — ${new Date().toISOString()}`,
"", formatEvents(events), "",
"Take action if needed, otherwise acknowledge briefly.",
].join("\n");
if (ctx?.isIdle()) {
piRef.sendUserMessage(prompt);
} else {
piRef.sendUserMessage(prompt, { deliverAs: "followUp" });
}
await cmd.del(EVENT_LIST);
}
export default function (pi: ExtensionAPI) {
piRef = pi;
pi.on("session_start", async (_event, _ctx) => {
ctx = _ctx;
if (!Redis) {
try {
Redis = (await import("ioredis")).default as RedisCtor;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
_ctx.ui.notify(`Gateway extension running without Redis: ${message}`, "warning");
return;
}
}
sub = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
cmd = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
await sub.connect();
await cmd.connect();
await sub.subscribe(NOTIFY_CHANNEL);
sub.on("message", () => { if (ctx?.isIdle()) drain(); });
// Drain anything that accumulated while session was down
const pending = await cmd.llen(EVENT_LIST);
if (pending > 0) await drain();
ctx.ui.setStatus("gateway", "🔗 connected");
});
pi.on("agent_end", async () => { drain(); });
pi.on("session_shutdown", async () => {
if (sub) { sub.unsubscribe(); sub.disconnect(); }
if (cmd) { cmd.disconnect(); }
});
}- 从任意脚本推送事件:
typescript
import Redis from "ioredis";
const redis = new Redis();
async function pushEvent(type: string, source: string, payload = {}) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
await redis.lpush("agent:events:main", JSON.stringify(event));
await redis.publish("agent:notify:main", JSON.stringify({ type }));
}
// 示例:下载完成时发送通知
await pushEvent("download.complete", "my-script", { file: "video.mp4" });- 重启pi——扩展会自动加载。
Tier 2: Always-On with Heartbeat
层级2:带心跳的始终在线模式
Adds: Cron heartbeat, watchdog failure detection, boot sequence.
Additional components:
- HEARTBEAT.md checklist (read by the agent on each heartbeat)
- Watchdog timer in the extension
- tmux or launchd for persistence
Build steps (on top of Tier 1):
- Create (or wherever your agent's home is):
~/HEARTBEAT.md
markdown
undefined新增功能: 定时心跳、监控程序故障检测、开机自启动序列。
额外组件:
- HEARTBEAT.md检查清单(Agent每次心跳时会读取)
- 扩展中的监控计时器
- tmux或launchd用于实现持久化
搭建步骤(基于层级1):
- 创建文件(或放在Agent的工作目录):
~/HEARTBEAT.md
markdown
undefinedHeartbeat Checklist
心跳检查清单
System Health
系统健康状态
- Redis is reachable
- Background worker is responding
- No stuck jobs
- Redis可正常访问
- 后台工作进程响应正常
- 无停滞任务
Pending Work
待处理工作
- Check inbox for unprocessed items
If nothing needs attention, reply HEARTBEAT_OK.
2. Add heartbeat cron — if using Inngest:
```typescript
export const heartbeatCron = inngest.createFunction(
{ id: "system-heartbeat" },
[{ cron: "*/15 * * * *" }],
async ({ step }) => {
await step.run("push-heartbeat", async () => {
await pushEvent("cron.heartbeat", "inngest", {});
});
}
);Or without Inngest, use a simple cron/setInterval:
bash
undefined- 检查收件箱中的未处理项
若无需要处理的内容,请回复HEARTBEAT_OK。
2. 添加心跳定时任务——若使用Inngest:
```typescript
export const heartbeatCron = inngest.createFunction(
{ id: "system-heartbeat" },
[{ cron: "*/15 * * * *" }],
async ({ step }) => {
await step.run("push-heartbeat", async () => {
await pushEvent("cron.heartbeat", "inngest", {});
});
}
);若不使用Inngest,可使用简单的定时任务/setInterval:
bash
undefinedcrontab -e
执行crontab -e编辑定时任务
*/15 * * * * redis-cli LPUSH agent:events:main '{"id":"'$(uuidgen)'","type":"cron.heartbeat","source":"cron","payload":{},"ts":'$(date +%s000)'}' && redis-cli PUBLISH agent:notify:main '{"type":"cron.heartbeat"}'
3. Add watchdog to the extension (insert after session_start):
```typescript
const WATCHDOG_THRESHOLD_MS = 30 * 60 * 1000; // 2x the 15-min interval
let lastHeartbeatTs = Date.now();
let watchdogAlarmFired = false;
setInterval(() => {
if (!piRef || !ctx) return;
if (watchdogAlarmFired) return;
const elapsed = Date.now() - lastHeartbeatTs;
if (elapsed > WATCHDOG_THRESHOLD_MS) {
watchdogAlarmFired = true;
piRef.sendUserMessage(`## ⚠️ MISSED HEARTBEAT\n\nNo heartbeat in ${Math.round(elapsed / 60000)} minutes. Check your worker/cron.`);
}
}, 5 * 60 * 1000);
// Reset on heartbeat receipt (inside drain function):
// if (events.some(e => e.type === "cron.heartbeat")) {
// lastHeartbeatTs = Date.now();
// watchdogAlarmFired = false;
// }- Make it survive terminal close — tmux:
bash
tmux new-session -d -s agent -x 120 -y 40 "pi"*/15 * * * * redis-cli LPUSH agent:events:main '{"id":"'$(uuidgen)'","type":"cron.heartbeat","source":"cron","payload":{},"ts":'$(date +%s000)'}' && redis-cli PUBLISH agent:notify:main '{"type":"cron.heartbeat"}'
3. 在扩展中添加监控程序(插入到session_start事件处理后):
```typescript
const WATCHDOG_THRESHOLD_MS = 30 * 60 * 1000; // 为15分钟间隔的2倍
let lastHeartbeatTs = Date.now();
let watchdogAlarmFired = false;
setInterval(() => {
if (!piRef || !ctx) return;
if (watchdogAlarmFired) return;
const elapsed = Date.now() - lastHeartbeatTs;
if (elapsed > WATCHDOG_THRESHOLD_MS) {
watchdogAlarmFired = true;
piRef.sendUserMessage(`## ⚠️ MISSED HEARTBEAT\n\nNo heartbeat in ${Math.round(elapsed / 60000)} minutes. Check your worker/cron.`);
}
}, 5 * 60 * 1000);
// 在drain函数中收到心跳时重置:
// if (events.some(e => e.type === "cron.heartbeat")) {
// lastHeartbeatTs = Date.now();
// watchdogAlarmFired = false;
// }- 实现终端关闭后仍运行——使用tmux:
bash
tmux new-session -d -s agent -x 120 -y 40 "pi"Attach: tmux attach -t agent
连接会话:tmux attach -t agent
Detach: Ctrl-B, D
断开会话:Ctrl-B, D
Or launchd for full always-on:
```xml
<!-- ~/Library/LaunchAgents/com.you.agent-gateway.plist -->
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key><string>com.you.agent-gateway</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>-c</string>
<string>tmux new-session -d -s agent -x 120 -y 40 "GATEWAY_ROLE=central pi" && while tmux has-session -t agent 2>/dev/null; do sleep 5; done</string>
</array>
<key>RunAtLoad</key><true/>
<key>KeepAlive</key><true/>
<key>StandardOutPath</key><string>/tmp/agent-gateway.log</string>
<key>StandardErrorPath</key><string>/tmp/agent-gateway.log</string>
</dict>
</plist>Load it:
bash
launchctl load ~/Library/LaunchAgents/com.you.agent-gateway.plist
或使用launchd实现完全的始终在线:
```xml
<!-- ~/Library/LaunchAgents/com.you.agent-gateway.plist -->
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key><string>com.you.agent-gateway</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>-c</string>
<string>tmux new-session -d -s agent -x 120 -y 40 "GATEWAY_ROLE=central pi" && while tmux has-session -t agent 2>/dev/null; do sleep 5; done</string>
</array>
<key>RunAtLoad</key><true/>
<key>KeepAlive</key><true/>
<key>StandardOutPath</key><string>/tmp/agent-gateway.log</string>
<key>StandardErrorPath</key><string>/tmp/agent-gateway.log</string>
</dict>
</plist>加载配置:
bash
launchctl load ~/Library/LaunchAgents/com.you.agent-gateway.plistTier 3: Multi-Session + Central/Satellite
层级3:多会话 + 中心/卫星模式
Adds: Multiple pi sessions, smart event routing.
When you need this: You run 2+ pi sessions simultaneously — one for oversight, others for coding tasks. Heartbeats should only go to the oversight session.
Key changes from Tier 1:
- Session ID becomes role-based:
typescript
const ROLE = process.env.GATEWAY_ROLE ?? "satellite";
const SESSION_ID = ROLE === "central" ? "gateway" : `pid-${process.pid}`;- Sessions register in a Redis set:
typescript
await cmd.sadd("agent:gateway:sessions", SESSION_ID);
// On shutdown:
await cmd.srem("agent:gateway:sessions", SESSION_ID);- fans out to targets:
pushEvent
typescript
async function pushEvent(type, source, payload, originSession?) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
const json = JSON.stringify(event);
const sessions = await redis.smembers("agent:gateway:sessions");
const targets = new Set<string>();
if (sessions.includes("gateway")) targets.add("gateway"); // central always
if (originSession && sessions.includes(originSession)) targets.add(originSession);
for (const sid of targets) {
await redis.lpush(`agent:events:${sid}`, json);
await redis.publish(`agent:notify:${sid}`, JSON.stringify({ type }));
}
}- Start your central session:
bash
GATEWAY_ROLE=central pi # This one gets ALL events- Other sessions start normally (satellites):
bash
pi # Gets only events it initiated新增功能: 支持多个pi会话,智能事件路由。
适用场景: 你同时运行2个以上pi会话——一个用于监控,其他用于编码任务。心跳仅发送到监控会话。
与层级1的主要区别:
- 会话ID基于角色设置:
typescript
const ROLE = process.env.GATEWAY_ROLE ?? "satellite";
const SESSION_ID = ROLE === "central" ? "gateway" : `pid-${process.pid}`;- 会话会在Redis集合中注册:
typescript
await cmd.sadd("agent:gateway:sessions", SESSION_ID);
// 会话关闭时:
await cmd.srem("agent:gateway:sessions", SESSION_ID);- 函数会将事件推送到目标会话:
pushEvent
typescript
async function pushEvent(type, source, payload, originSession?) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
const json = JSON.stringify(event);
const sessions = await redis.smembers("agent:gateway:sessions");
const targets = new Set<string>();
if (sessions.includes("gateway")) targets.add("gateway"); // 中心会话始终接收
if (originSession && sessions.includes(originSession)) targets.add(originSession);
for (const sid of targets) {
await redis.lpush(`agent:events:${sid}`, json);
await redis.publish(`agent:notify:${sid}`, JSON.stringify({ type }));
}
}- 启动中心会话:
bash
GATEWAY_ROLE=central pi # 该会话会接收所有事件- 其他会话以卫星模式启动:
bash
pi # 仅接收由它发起的事件Tier 4: External Channels (Telegram, WebSocket)
层级4:外部渠道(Telegram、WebSocket)
Adds: Talk to your agent from your phone.
This tier requires the embedded daemon approach — pi runs as a library inside a Node.js process, not as a TUI. See the joelclaw.com article "Building a Gateway for Your AI Agent" for the full architecture.
Key components:
- from pi SDK — headless agent session
createAgentSession() - grammY for Telegram bot
- Command queue that serializes all inputs (TUI, heartbeat, Telegram)
- Outbound router that sends responses back to the asking channel
This is the most complex tier. Only build it if you actually need mobile access.
新增功能: 可通过手机与Agent交互。
该层级需要使用嵌入式守护进程方案——pi作为库运行在Node.js进程中,而非TUI界面。完整架构可参考joelclaw.com上的文章《Building a Gateway for Your AI Agent》。
核心组件:
- pi SDK中的——无头Agent会话
createAgentSession() - grammY用于Telegram机器人
- 命令队列序列化所有输入(TUI、心跳、Telegram)
- 输出路由器将响应发送回请求渠道
这是最复杂的层级,仅在确实需要移动端访问时再搭建。
Verification Checklist
验证清单
After setup, verify:
- Pi session starts and extension loads (check status bar for 🔗)
- Push a test event: +
redis-cli LPUSH agent:events:main '{"id":"test","type":"test","source":"manual","payload":{},"ts":0}'redis-cli PUBLISH agent:notify:main test - Event appears in pi session within seconds
- (Tier 2+) Heartbeat fires on schedule
- (Tier 2+) Kill the heartbeat source — watchdog alarm fires after 30 min
- (Tier 3+) Central session receives heartbeats, satellite sessions don't
搭建完成后,请验证以下内容:
- Pi会话启动且扩展加载成功(检查状态栏是否显示🔗)
- 推送测试事件:+
redis-cli LPUSH agent:events:main '{"id":"test","type":"test","source":"manual","payload":{},"ts":0}'redis-cli PUBLISH agent:notify:main test - 事件在几秒内出现在pi会话中
- (层级2及以上)心跳按计划触发
- (层级2及以上)停止心跳源——30分钟后监控程序会触发警报
- (层级3及以上)中心会话接收心跳,卫星会话不接收
Setup Script (curl-first)
自动化搭建脚本(一键式)
For automated setup, the user can run:
bash
curl -sL joelclaw.com/scripts/gateway-setup.sh | bashOr with a specific tier:
bash
curl -sL joelclaw.com/scripts/gateway-setup.sh | bash -s -- 2The script is idempotent, detects Redis, installs the extension, and configures persistence for Tier 2+.
用户可通过以下命令实现自动化搭建:
bash
curl -sL joelclaw.com/scripts/gateway-setup.sh | bash或指定搭建层级:
bash
curl -sL joelclaw.com/scripts/gateway-setup.sh | bash -s -- 2该脚本支持幂等执行,会检测Redis、安装扩展,并为层级2及以上配置持久化。
Decision Chain (compressed ADRs)
决策链(精简架构决策记录)
Sequential architecture decisions that led to the current gateway design. Each solved a real problem discovered in the previous iteration.
| # | ADR | Decision | Problem Solved | Key Tradeoff |
|---|---|---|---|---|
| 1 | 0010 | Hybrid cron + event gateway | Manual triage bottleneck | Always-on LLM session = expensive. Cron = latency. Hybrid balances both. |
| 2 | 0018 | Redis event bridge (pi extension) | No Inngest→pi bridge existed | Extension-only, no separate process. Redis as the clean interface boundary. |
| 3 | 0035 | Central + satellite routing | Heartbeats interrupting coding sessions | Fan-out by role. Central gets all, satellites get only origin-targeted. |
| 4 | 0036 | launchd + tmux (superseded) | Gateway session dies on terminal close | Pi needs PTY. tmux provides it. launchd restarts on crash. |
| 5 | 0037 | 3-layer watchdog | "Who watches the watchmen" | Extension watchdog (Inngest down), launchd tripwire (pi down), heartbeat (everything healthy). |
| 6 | 0038 | Embedded pi daemon (supersedes 0036) | No mobile access, no multi-channel | Embeds pi as library. grammY for Telegram. Command queue serializes all inputs. Most complex tier. |
Read order for full context: 0010 → 0018 → 0035 → 0037 → 0038 (skip 0036, superseded)
以下是逐步形成当前网关设计的架构决策,每个决策都解决了前一版本中发现的实际问题。
| 序号 | 架构决策记录 | 决策内容 | 解决的问题 | 核心权衡 |
|---|---|---|---|---|
| 1 | 0010 | 混合定时任务+事件网关 | 手动分类瓶颈 | 始终在线LLM会话成本高;定时任务存在延迟;混合模式平衡了两者 |
| 2 | 0018 | Redis事件桥(pi扩展) | 不存在Inngest→pi的桥接方案 | 仅通过扩展实现,无需独立进程;将Redis作为清晰的接口边界 |
| 3 | 0035 | 中心+卫星路由模式 | 心跳干扰编码会话 | 按角色分发事件;中心会话接收所有事件,卫星会话仅接收针对性事件 |
| 4 | 0036 | launchd + tmux(已被替代) | 网关会话在终端关闭后终止 | Pi需要伪终端;tmux提供伪终端;launchd在崩溃后自动重启 |
| 5 | 0037 | 三层监控程序 | "谁来监控监控者"的问题 | 扩展层监控(Inngest故障)、launchd触发(pi故障)、心跳(一切正常) |
| 6 | 0038 | 嵌入式pi守护进程(替代0036) | 无移动端访问、无多渠道支持 | 将pi作为库嵌入;使用grammY实现Telegram功能;命令队列序列化所有输入;最复杂的层级 |
完整上下文阅读顺序: 0010 → 0018 → 0035 → 0037 → 0038(跳过0036,已被替代)
Known Limitations
已知限制
-
Drain race condition. The extension doesthen
LRANGE— not atomic. Events pushed between those calls are deleted without processing. The in-memoryDELdedup prevents double-delivery but doesn't prevent lost events. Fix: useseenIds+LRANGEor a Redis transaction. Low-impact on single-user systems but real.LTRIM -
Redis connection recovery is notify-only. If Redis goes down, the extension catches the error and logs it, but doesn't retry or reconnect automatically. ioredishandles reconnection at the client level, but accumulated events during the outage may be lost.
retryStrategy -
Watchdog intervals are hardcoded. Check interval (5 min) and threshold (30 min) are constants in the extension. Should be configurable via env vars or Redis config.
-
No persistent dedup across restarts. TheSet lives in memory and caps at 500. Process restart = dedup resets. For the heartbeat-every-15-min use case this is fine. For high-frequency events it could cause duplicates.
seenIds
-
拉取事件的竞态条件。扩展先执行再执行
LRANGE——这不是原子操作。在这两个操作之间推送的事件会被删除而不被处理。内存中的DEL集合可防止重复投递,但无法避免事件丢失。修复方案:使用seenIds+LRANGE或Redis事务。在单用户系统中影响较小,但确实存在问题。LTRIM -
Redis连接恢复仅通知。若Redis宕机,扩展会捕获错误并记录,但不会自动重试或重新连接。ioredis的会在客户端层面处理重连,但宕机期间积累的事件可能会丢失。
retryStrategy -
监控程序间隔为硬编码。检查间隔(5分钟)和阈值(30分钟)是扩展中的常量,应支持通过环境变量或Redis配置进行调整。
-
重启后去重不持久。集合存储在内存中,上限为500条。进程重启后去重会重置。对于每15分钟一次的心跳场景,这没问题;但对于高频事件,可能会导致重复投递。
seenIds
Credits
致谢
- OpenClaw — gateway-daemon pattern, command queue serialization, channel plugin architecture
- pi coding agent — extension API, sendUserMessage(), session lifecycle
- Inngest — durable event-driven workflows
- joelclaw.com/building-a-gateway-for-your-ai-agent — human summary
- OpenClaw —— 网关守护进程模式、命令队列序列化、渠道插件架构
- pi coding agent —— 扩展API、sendUserMessage()、会话生命周期管理
- Inngest —— 持久化事件驱动工作流
- joelclaw.com/building-a-gateway-for-your-ai-agent —— 人工总结文档