Loading...
Loading...
Set up a persistent AI agent gateway on macOS with Redis event bridge, heartbeat monitoring, and multi-session routing. Interactive Q&A to match your intent — from minimal (Redis + extension) to full (embedded daemon + Telegram + watchdog). Use when: 'set up a gateway', 'I want my agent always on', 'event bridge', 'heartbeat monitoring', 'agent notifications', or any request to make an AI agent persistent and reachable.
npx skill4agent add joelhooks/joelclaw gateway-setupinngest-localGATEWAY_ROLE=centralGATEWAY_ROLE=central piserveHostlocalhost:3100inngestServe({
client: inngest,
functions,
serveHost: "http://host.docker.internal:3100",
})curl -X PUT http://localhost:3100/api/inngestCannot find module '@ioredis/commands'bun add @ioredis/commands
# or: rm -rf node_modules && bun installsubcmdpushGatewayEvent()Background process → Redis LPUSH → pi extension drains on notify → injected as user messagemkdir -p ~/.pi/agent/extensions/gateway~/.pi/agent/extensions/gateway/package.json{
"name": "gateway-extension",
"private": true,
"dependencies": {
"ioredis": "^5.4.2"
}
}cd ~/.pi/agent/extensions/gateway && npm install~/.pi/agent/extensions/gateway/index.tsimport type { ExtensionAPI, ExtensionContext } from "@mariozechner/pi-coding-agent";
const SESSION_ID = "main";
const EVENT_LIST = `agent:events:${SESSION_ID}`;
const NOTIFY_CHANNEL = `agent:notify:${SESSION_ID}`;
type RedisLike = {
on(event: string, listener: (...args: unknown[]) => void): void;
connect(): Promise<void>;
subscribe(channel: string): Promise<unknown>;
lrange(key: string, start: number, stop: number): Promise<string[]>;
del(key: string): Promise<number>;
llen(key: string): Promise<number>;
unsubscribe(): void;
disconnect(): void;
};
type RedisCtor = new (options: { host: string; port: number; lazyConnect: boolean }) => RedisLike;
let Redis: RedisCtor | null = null;
let sub: RedisLike | null = null;
let cmd: RedisLike | null = null;
let ctx: ExtensionContext | null = null;
let piRef: ExtensionAPI | null = null;
interface SystemEvent {
id: string;
type: string;
source: string;
payload: Record<string, unknown>;
ts: number;
}
function formatEvents(events: SystemEvent[]): string {
return events.map((e) => {
const time = new Date(e.ts).toLocaleTimeString("en-US", { hour12: false });
return `- **[${time}] ${e.type}** (${e.source})`;
}).join("\n");
}
async function drain(): Promise<void> {
if (!cmd || !piRef) return;
const raw = await cmd.lrange(EVENT_LIST, 0, -1);
if (raw.length === 0) return;
const events = raw.reverse().map(r => {
try { return JSON.parse(r) as SystemEvent; } catch { return null; }
}).filter(Boolean) as SystemEvent[];
if (events.length === 0) { await cmd.del(EVENT_LIST); return; }
const prompt = [
`## 🔔 ${events.length} event(s) — ${new Date().toISOString()}`,
"", formatEvents(events), "",
"Take action if needed, otherwise acknowledge briefly.",
].join("\n");
if (ctx?.isIdle()) {
piRef.sendUserMessage(prompt);
} else {
piRef.sendUserMessage(prompt, { deliverAs: "followUp" });
}
await cmd.del(EVENT_LIST);
}
export default function (pi: ExtensionAPI) {
piRef = pi;
pi.on("session_start", async (_event, _ctx) => {
ctx = _ctx;
if (!Redis) {
try {
Redis = (await import("ioredis")).default as RedisCtor;
} catch (error) {
const message = error instanceof Error ? error.message : String(error);
_ctx.ui.notify(`Gateway extension running without Redis: ${message}`, "warning");
return;
}
}
sub = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
cmd = new Redis({ host: "localhost", port: 6379, lazyConnect: true });
await sub.connect();
await cmd.connect();
await sub.subscribe(NOTIFY_CHANNEL);
sub.on("message", () => { if (ctx?.isIdle()) drain(); });
// Drain anything that accumulated while session was down
const pending = await cmd.llen(EVENT_LIST);
if (pending > 0) await drain();
ctx.ui.setStatus("gateway", "🔗 connected");
});
pi.on("agent_end", async () => { drain(); });
pi.on("session_shutdown", async () => {
if (sub) { sub.unsubscribe(); sub.disconnect(); }
if (cmd) { cmd.disconnect(); }
});
}import Redis from "ioredis";
const redis = new Redis();
async function pushEvent(type: string, source: string, payload = {}) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
await redis.lpush("agent:events:main", JSON.stringify(event));
await redis.publish("agent:notify:main", JSON.stringify({ type }));
}
// Example: notify when a download finishes
await pushEvent("download.complete", "my-script", { file: "video.mp4" });~/HEARTBEAT.md# Heartbeat Checklist
## System Health
- [ ] Redis is reachable
- [ ] Background worker is responding
- [ ] No stuck jobs
## Pending Work
- [ ] Check inbox for unprocessed items
If nothing needs attention, reply HEARTBEAT_OK.export const heartbeatCron = inngest.createFunction(
{ id: "system-heartbeat" },
[{ cron: "*/15 * * * *" }],
async ({ step }) => {
await step.run("push-heartbeat", async () => {
await pushEvent("cron.heartbeat", "inngest", {});
});
}
);# crontab -e
*/15 * * * * redis-cli LPUSH agent:events:main '{"id":"'$(uuidgen)'","type":"cron.heartbeat","source":"cron","payload":{},"ts":'$(date +%s000)'}' && redis-cli PUBLISH agent:notify:main '{"type":"cron.heartbeat"}'const WATCHDOG_THRESHOLD_MS = 30 * 60 * 1000; // 2x the 15-min interval
let lastHeartbeatTs = Date.now();
let watchdogAlarmFired = false;
setInterval(() => {
if (!piRef || !ctx) return;
if (watchdogAlarmFired) return;
const elapsed = Date.now() - lastHeartbeatTs;
if (elapsed > WATCHDOG_THRESHOLD_MS) {
watchdogAlarmFired = true;
piRef.sendUserMessage(`## ⚠️ MISSED HEARTBEAT\n\nNo heartbeat in ${Math.round(elapsed / 60000)} minutes. Check your worker/cron.`);
}
}, 5 * 60 * 1000);
// Reset on heartbeat receipt (inside drain function):
// if (events.some(e => e.type === "cron.heartbeat")) {
// lastHeartbeatTs = Date.now();
// watchdogAlarmFired = false;
// }tmux new-session -d -s agent -x 120 -y 40 "pi"
# Attach: tmux attach -t agent
# Detach: Ctrl-B, D<!-- ~/Library/LaunchAgents/com.you.agent-gateway.plist -->
<?xml version="1.0" encoding="UTF-8"?>
<!DOCTYPE plist PUBLIC "-//Apple//DTD PLIST 1.0//EN" "http://www.apple.com/DTDs/PropertyList-1.0.dtd">
<plist version="1.0">
<dict>
<key>Label</key><string>com.you.agent-gateway</string>
<key>ProgramArguments</key>
<array>
<string>/bin/bash</string>
<string>-c</string>
<string>tmux new-session -d -s agent -x 120 -y 40 "GATEWAY_ROLE=central pi" && while tmux has-session -t agent 2>/dev/null; do sleep 5; done</string>
</array>
<key>RunAtLoad</key><true/>
<key>KeepAlive</key><true/>
<key>StandardOutPath</key><string>/tmp/agent-gateway.log</string>
<key>StandardErrorPath</key><string>/tmp/agent-gateway.log</string>
</dict>
</plist>launchctl load ~/Library/LaunchAgents/com.you.agent-gateway.plistconst ROLE = process.env.GATEWAY_ROLE ?? "satellite";
const SESSION_ID = ROLE === "central" ? "gateway" : `pid-${process.pid}`;await cmd.sadd("agent:gateway:sessions", SESSION_ID);
// On shutdown:
await cmd.srem("agent:gateway:sessions", SESSION_ID);pushEventasync function pushEvent(type, source, payload, originSession?) {
const event = { id: crypto.randomUUID(), type, source, payload, ts: Date.now() };
const json = JSON.stringify(event);
const sessions = await redis.smembers("agent:gateway:sessions");
const targets = new Set<string>();
if (sessions.includes("gateway")) targets.add("gateway"); // central always
if (originSession && sessions.includes(originSession)) targets.add(originSession);
for (const sid of targets) {
await redis.lpush(`agent:events:${sid}`, json);
await redis.publish(`agent:notify:${sid}`, JSON.stringify({ type }));
}
}GATEWAY_ROLE=central pi # This one gets ALL eventspi # Gets only events it initiatedcreateAgentSession()redis-cli LPUSH agent:events:main '{"id":"test","type":"test","source":"manual","payload":{},"ts":0}'redis-cli PUBLISH agent:notify:main testcurl -sL joelclaw.com/scripts/gateway-setup.sh | bashcurl -sL joelclaw.com/scripts/gateway-setup.sh | bash -s -- 2| # | ADR | Decision | Problem Solved | Key Tradeoff |
|---|---|---|---|---|
| 1 | 0010 | Hybrid cron + event gateway | Manual triage bottleneck | Always-on LLM session = expensive. Cron = latency. Hybrid balances both. |
| 2 | 0018 | Redis event bridge (pi extension) | No Inngest→pi bridge existed | Extension-only, no separate process. Redis as the clean interface boundary. |
| 3 | 0035 | Central + satellite routing | Heartbeats interrupting coding sessions | Fan-out by role. Central gets all, satellites get only origin-targeted. |
| 4 | 0036 | launchd + tmux (superseded) | Gateway session dies on terminal close | Pi needs PTY. tmux provides it. launchd restarts on crash. |
| 5 | 0037 | 3-layer watchdog | "Who watches the watchmen" | Extension watchdog (Inngest down), launchd tripwire (pi down), heartbeat (everything healthy). |
| 6 | 0038 | Embedded pi daemon (supersedes 0036) | No mobile access, no multi-channel | Embeds pi as library. grammY for Telegram. Command queue serializes all inputs. Most complex tier. |
LRANGEDELseenIdsLRANGELTRIMretryStrategyseenIds