leader-election
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseLeader Election
领导者选举
Single leader with automatic failover.
具备自动故障转移功能的单领导者模式。
When to Use This Skill
何时使用该技能
- Multiple instances, only one should run cron jobs
- Singleton queue consumers
- Coordinating distributed workers
- Need automatic failover when leader dies
- 多实例场景下,仅需一个实例运行定时任务
- 单例队列消费者
- 协调分布式工作节点
- 领导者故障时需要自动故障转移
Core Concepts
核心概念
- Heartbeat - Leader sends periodic heartbeats
- Timeout - If heartbeat stops, leader is dead
- Term - Monotonic counter prevents split-brain
- Failover - Followers compete to become new leader
- Heartbeat(心跳) - 领导者定期发送心跳信号
- Timeout(超时) - 若心跳停止,则判定领导者已故障
- Term(任期) - 单调计数器防止脑裂问题
- Failover(故障转移) - 跟随者竞争成为新的领导者
TypeScript Implementation
TypeScript 实现
Database Schema
数据库模式
sql
-- migrations/leader_election.sql
CREATE TABLE leader_election (
service_name VARCHAR(255) PRIMARY KEY,
leader_id VARCHAR(255) NOT NULL,
term INTEGER NOT NULL DEFAULT 1,
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
-- Try to become leader (atomic)
CREATE OR REPLACE FUNCTION try_become_leader(
p_service_name VARCHAR(255),
p_candidate_id VARCHAR(255),
p_timeout_seconds INTEGER DEFAULT 30
) RETURNS BOOLEAN AS $$
DECLARE
v_now TIMESTAMPTZ := NOW();
v_timeout TIMESTAMPTZ := v_now - (p_timeout_seconds || ' seconds')::INTERVAL;
v_current RECORD;
BEGIN
SELECT * INTO v_current
FROM leader_election
WHERE service_name = p_service_name
FOR UPDATE;
IF NOT FOUND THEN
INSERT INTO leader_election (service_name, leader_id, term, last_heartbeat)
VALUES (p_service_name, p_candidate_id, 1, v_now);
RETURN TRUE;
END IF;
IF v_current.leader_id = p_candidate_id THEN
UPDATE leader_election SET last_heartbeat = v_now
WHERE service_name = p_service_name;
RETURN TRUE;
END IF;
IF v_current.last_heartbeat < v_timeout THEN
UPDATE leader_election
SET leader_id = p_candidate_id, term = term + 1, last_heartbeat = v_now
WHERE service_name = p_service_name;
RETURN TRUE;
END IF;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
-- Send heartbeat
CREATE OR REPLACE FUNCTION leader_heartbeat(
p_service_name VARCHAR(255),
p_leader_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
UPDATE leader_election SET last_heartbeat = NOW()
WHERE service_name = p_service_name AND leader_id = p_leader_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Step down
CREATE OR REPLACE FUNCTION step_down(
p_service_name VARCHAR(255),
p_leader_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
DELETE FROM leader_election
WHERE service_name = p_service_name AND leader_id = p_leader_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;sql
-- migrations/leader_election.sql
CREATE TABLE leader_election (
service_name VARCHAR(255) PRIMARY KEY,
leader_id VARCHAR(255) NOT NULL,
term INTEGER NOT NULL DEFAULT 1,
last_heartbeat TIMESTAMPTZ NOT NULL DEFAULT NOW(),
metadata JSONB DEFAULT '{}'
);
-- Try to become leader (atomic)
CREATE OR REPLACE FUNCTION try_become_leader(
p_service_name VARCHAR(255),
p_candidate_id VARCHAR(255),
p_timeout_seconds INTEGER DEFAULT 30
) RETURNS BOOLEAN AS $$
DECLARE
v_now TIMESTAMPTZ := NOW();
v_timeout TIMESTAMPTZ := v_now - (p_timeout_seconds || ' seconds')::INTERVAL;
v_current RECORD;
BEGIN
SELECT * INTO v_current
FROM leader_election
WHERE service_name = p_service_name
FOR UPDATE;
IF NOT FOUND THEN
INSERT INTO leader_election (service_name, leader_id, term, last_heartbeat)
VALUES (p_service_name, p_candidate_id, 1, v_now);
RETURN TRUE;
END IF;
IF v_current.leader_id = p_candidate_id THEN
UPDATE leader_election SET last_heartbeat = v_now
WHERE service_name = p_service_name;
RETURN TRUE;
END IF;
IF v_current.last_heartbeat < v_timeout THEN
UPDATE leader_election
SET leader_id = p_candidate_id, term = term + 1, last_heartbeat = v_now
WHERE service_name = p_service_name;
RETURN TRUE;
END IF;
RETURN FALSE;
END;
$$ LANGUAGE plpgsql;
-- Send heartbeat
CREATE OR REPLACE FUNCTION leader_heartbeat(
p_service_name VARCHAR(255),
p_leader_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
UPDATE leader_election SET last_heartbeat = NOW()
WHERE service_name = p_service_name AND leader_id = p_leader_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;
-- Step down
CREATE OR REPLACE FUNCTION step_down(
p_service_name VARCHAR(255),
p_leader_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
DELETE FROM leader_election
WHERE service_name = p_service_name AND leader_id = p_leader_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;Leader Election Class
领导者选举类
typescript
// leader-election.ts
import { SupabaseClient } from '@supabase/supabase-js';
interface LeaderElectionConfig {
serviceName: string;
candidateId: string;
heartbeatIntervalMs?: number;
heartbeatTimeoutSeconds?: number;
onBecomeLeader?: () => void | Promise<void>;
onLoseLeadership?: () => void | Promise<void>;
}
export class LeaderElection {
private config: Required<LeaderElectionConfig>;
private supabase: SupabaseClient;
private isLeader = false;
private heartbeatInterval: NodeJS.Timeout | null = null;
private running = false;
constructor(supabase: SupabaseClient, config: LeaderElectionConfig) {
this.supabase = supabase;
this.config = {
heartbeatIntervalMs: 10000,
heartbeatTimeoutSeconds: 30,
onBecomeLeader: () => {},
onLoseLeadership: () => {},
...config,
};
}
async start(): Promise<void> {
if (this.running) return;
this.running = true;
await this.tryBecomeLeader();
this.heartbeatInterval = setInterval(
() => this.heartbeatLoop(),
this.config.heartbeatIntervalMs
);
}
async stop(): Promise<void> {
this.running = false;
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.isLeader) {
await this.stepDown();
}
}
isCurrentLeader(): boolean {
return this.isLeader;
}
private async tryBecomeLeader(): Promise<boolean> {
const { data, error } = await this.supabase.rpc('try_become_leader', {
p_service_name: this.config.serviceName,
p_candidate_id: this.config.candidateId,
p_timeout_seconds: this.config.heartbeatTimeoutSeconds,
});
if (error) {
console.error('[LeaderElection] Error:', error);
return false;
}
const becameLeader = data === true;
if (becameLeader && !this.isLeader) {
this.isLeader = true;
console.log(`[LeaderElection] ${this.config.candidateId} became leader`);
await this.config.onBecomeLeader();
}
return becameLeader;
}
private async sendHeartbeat(): Promise<boolean> {
const { data, error } = await this.supabase.rpc('leader_heartbeat', {
p_service_name: this.config.serviceName,
p_leader_id: this.config.candidateId,
});
return !error && data === true;
}
private async stepDown(): Promise<void> {
if (!this.isLeader) return;
await this.supabase.rpc('step_down', {
p_service_name: this.config.serviceName,
p_leader_id: this.config.candidateId,
});
this.isLeader = false;
console.log(`[LeaderElection] ${this.config.candidateId} stepped down`);
await this.config.onLoseLeadership();
}
private async heartbeatLoop(): Promise<void> {
if (!this.running) return;
if (this.isLeader) {
const maintained = await this.sendHeartbeat();
if (!maintained) {
this.isLeader = false;
console.log(`[LeaderElection] Lost leadership`);
await this.config.onLoseLeadership();
}
} else {
await this.tryBecomeLeader();
}
}
}typescript
// leader-election.ts
import { SupabaseClient } from '@supabase/supabase-js';
interface LeaderElectionConfig {
serviceName: string;
candidateId: string;
heartbeatIntervalMs?: number;
heartbeatTimeoutSeconds?: number;
onBecomeLeader?: () => void | Promise<void>;
onLoseLeadership?: () => void | Promise<void>;
}
export class LeaderElection {
private config: Required<LeaderElectionConfig>;
private supabase: SupabaseClient;
private isLeader = false;
private heartbeatInterval: NodeJS.Timeout | null = null;
private running = false;
constructor(supabase: SupabaseClient, config: LeaderElectionConfig) {
this.supabase = supabase;
this.config = {
heartbeatIntervalMs: 10000,
heartbeatTimeoutSeconds: 30,
onBecomeLeader: () => {},
onLoseLeadership: () => {},
...config,
};
}
async start(): Promise<void> {
if (this.running) return;
this.running = true;
await this.tryBecomeLeader();
this.heartbeatInterval = setInterval(
() => this.heartbeatLoop(),
this.config.heartbeatIntervalMs
);
}
async stop(): Promise<void> {
this.running = false;
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
if (this.isLeader) {
await this.stepDown();
}
}
isCurrentLeader(): boolean {
return this.isLeader;
}
private async tryBecomeLeader(): Promise<boolean> {
const { data, error } = await this.supabase.rpc('try_become_leader', {
p_service_name: this.config.serviceName,
p_candidate_id: this.config.candidateId,
p_timeout_seconds: this.config.heartbeatTimeoutSeconds,
});
if (error) {
console.error('[LeaderElection] Error:', error);
return false;
}
const becameLeader = data === true;
if (becameLeader && !this.isLeader) {
this.isLeader = true;
console.log(`[LeaderElection] ${this.config.candidateId} became leader`);
await this.config.onBecomeLeader();
}
return becameLeader;
}
private async sendHeartbeat(): Promise<boolean> {
const { data, error } = await this.supabase.rpc('leader_heartbeat', {
p_service_name: this.config.serviceName,
p_leader_id: this.config.candidateId,
});
return !error && data === true;
}
private async stepDown(): Promise<void> {
if (!this.isLeader) return;
await this.supabase.rpc('step_down', {
p_service_name: this.config.serviceName,
p_leader_id: this.config.candidateId,
});
this.isLeader = false;
console.log(`[LeaderElection] ${this.config.candidateId} stepped down`);
await this.config.onLoseLeadership();
}
private async heartbeatLoop(): Promise<void> {
if (!this.running) return;
if (this.isLeader) {
const maintained = await this.sendHeartbeat();
if (!maintained) {
this.isLeader = false;
console.log(`[LeaderElection] Lost leadership`);
await this.config.onLoseLeadership();
}
} else {
await this.tryBecomeLeader();
}
}
}Python Implementation
Python 实现
python
undefinedpython
undefinedleader_election.py
leader_election.py
import asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable, Optional
@dataclass
class LeaderElectionConfig:
service_name: str
candidate_id: str
heartbeat_interval: float = 10.0
heartbeat_timeout: int = 30
on_become_leader: Optional[Callable[[], Awaitable[None]]] = None
on_lose_leadership: Optional[Callable[[], Awaitable[None]]] = None
class LeaderElection:
def init(self, db, config: LeaderElectionConfig):
self.db = db
self.config = config
self._is_leader = False
self._running = False
self._task: asyncio.Task | None = None
async def start(self):
if self._running:
return
self._running = True
await self._try_become_leader()
self._task = asyncio.create_task(self._heartbeat_loop())
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
if self._is_leader:
await self._step_down()
@property
def is_leader(self) -> bool:
return self._is_leader
async def _try_become_leader(self) -> bool:
result = await self.db.execute(
"SELECT try_become_leader($1, $2, $3)",
self.config.service_name,
self.config.candidate_id,
self.config.heartbeat_timeout,
)
became_leader = result[0][0]
if became_leader and not self._is_leader:
self._is_leader = True
print(f"[LeaderElection] {self.config.candidate_id} became leader")
if self.config.on_become_leader:
await self.config.on_become_leader()
return became_leader
async def _send_heartbeat(self) -> bool:
result = await self.db.execute(
"SELECT leader_heartbeat($1, $2)",
self.config.service_name,
self.config.candidate_id,
)
return result[0][0]
async def _step_down(self):
await self.db.execute(
"SELECT step_down($1, $2)",
self.config.service_name,
self.config.candidate_id,
)
self._is_leader = False
if self.config.on_lose_leadership:
await self.config.on_lose_leadership()
async def _heartbeat_loop(self):
while self._running:
await asyncio.sleep(self.config.heartbeat_interval)
if self._is_leader:
if not await self._send_heartbeat():
self._is_leader = False
if self.config.on_lose_leadership:
await self.config.on_lose_leadership()
else:
await self._try_become_leader()undefinedimport asyncio
from dataclasses import dataclass
from typing import Callable, Awaitable, Optional
@dataclass
class LeaderElectionConfig:
service_name: str
candidate_id: str
heartbeat_interval: float = 10.0
heartbeat_timeout: int = 30
on_become_leader: Optional[Callable[[], Awaitable[None]]] = None
on_lose_leadership: Optional[Callable[[], Awaitable[None]]] = None
class LeaderElection:
def init(self, db, config: LeaderElectionConfig):
self.db = db
self.config = config
self._is_leader = False
self._running = False
self._task: asyncio.Task | None = None
async def start(self):
if self._running:
return
self._running = True
await self._try_become_leader()
self._task = asyncio.create_task(self._heartbeat_loop())
async def stop(self):
self._running = False
if self._task:
self._task.cancel()
if self._is_leader:
await self._step_down()
@property
def is_leader(self) -> bool:
return self._is_leader
async def _try_become_leader(self) -> bool:
result = await self.db.execute(
"SELECT try_become_leader($1, $2, $3)",
self.config.service_name,
self.config.candidate_id,
self.config.heartbeat_timeout,
)
became_leader = result[0][0]
if became_leader and not self._is_leader:
self._is_leader = True
print(f"[LeaderElection] {self.config.candidate_id} became leader")
if self.config.on_become_leader:
await self.config.on_become_leader()
return became_leader
async def _send_heartbeat(self) -> bool:
result = await self.db.execute(
"SELECT leader_heartbeat($1, $2)",
self.config.service_name,
self.config.candidate_id,
)
return result[0][0]
async def _step_down(self):
await self.db.execute(
"SELECT step_down($1, $2)",
self.config.service_name,
self.config.candidate_id,
)
self._is_leader = False
if self.config.on_lose_leadership:
await self.config.on_lose_leadership()
async def _heartbeat_loop(self):
while self._running:
await asyncio.sleep(self.config.heartbeat_interval)
if self._is_leader:
if not await self._send_heartbeat():
self._is_leader = False
if self.config.on_lose_leadership:
await self.config.on_lose_leadership()
else:
await self._try_become_leader()undefinedUsage Examples
使用示例
Singleton Cron Job
单例定时任务
typescript
const election = new LeaderElection(supabase, {
serviceName: 'daily-report-generator',
candidateId: `worker-${process.env.HOSTNAME}`,
onBecomeLeader: async () => {
console.log('Starting cron jobs');
startCronJobs();
},
onLoseLeadership: async () => {
console.log('Stopping cron jobs');
stopCronJobs();
},
});
await election.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await election.stop();
process.exit(0);
});typescript
const election = new LeaderElection(supabase, {
serviceName: 'daily-report-generator',
candidateId: `worker-${process.env.HOSTNAME}`,
onBecomeLeader: async () => {
console.log('Starting cron jobs');
startCronJobs();
},
onLoseLeadership: async () => {
console.log('Stopping cron jobs');
stopCronJobs();
},
});
await election.start();
// Graceful shutdown
process.on('SIGTERM', async () => {
await election.stop();
process.exit(0);
});Queue Consumer
队列消费者
typescript
const election = new LeaderElection(supabase, {
serviceName: 'queue-consumer',
candidateId: `consumer-${process.pid}`,
onBecomeLeader: () => queueConsumer.start(),
onLoseLeadership: () => queueConsumer.stop(),
});
await election.start();typescript
const election = new LeaderElection(supabase, {
serviceName: 'queue-consumer',
candidateId: `consumer-${process.pid}`,
onBecomeLeader: () => queueConsumer.start(),
onLoseLeadership: () => queueConsumer.stop(),
});
await election.start();Conditional Execution
条件执行
typescript
async function runIfLeader<T>(
election: LeaderElection,
fn: () => Promise<T>
): Promise<T | null> {
if (!election.isCurrentLeader()) {
return null;
}
return fn();
}
// Usage
await runIfLeader(election, async () => {
await sendDailyEmails();
});typescript
async function runIfLeader<T>(
election: LeaderElection,
fn: () => Promise<T>
): Promise<T | null> {
if (!election.isCurrentLeader()) {
return null;
}
return fn();
}
// Usage
await runIfLeader(election, async () => {
await sendDailyEmails();
});Configuration Guide
配置指南
| Scenario | heartbeatIntervalMs | heartbeatTimeoutSeconds |
|---|---|---|
| Fast failover | 5000 | 15 |
| Normal | 10000 | 30 |
| Unreliable network | 30000 | 90 |
Rule:
heartbeatInterval < heartbeatTimeout / 3| 场景 | heartbeatIntervalMs(毫秒) | heartbeatTimeoutSeconds(秒) |
|---|---|---|
| 快速故障转移 | 5000 | 15 |
| 常规场景 | 10000 | 30 |
| 不可靠网络 | 30000 | 90 |
规则:
heartbeatInterval < heartbeatTimeout / 3Best Practices
最佳实践
- Unique candidate IDs - Include hostname/PID
- Graceful shutdown - Call stop() on SIGTERM
- Check before work - Use isCurrentLeader()
- Monitor changes - Alert on frequent leader changes
- Tune timeouts - Match your network reliability
- 唯一候选ID - 包含主机名/进程ID
- 优雅停机 - 收到SIGTERM信号时调用stop()
- 工作前检查 - 使用isCurrentLeader()验证
- 监控变更 - 频繁领导者变更时触发告警
- 调优超时 - 匹配网络可靠性
Common Mistakes
常见错误
- Same candidate ID across instances
- Not calling stop() on shutdown
- Starting work without checking leadership
- Heartbeat interval too close to timeout
- Not handling onLoseLeadership
- 多个实例使用相同候选ID
- 停机时未调用stop()
- 未检查领导者身份就开始工作
- 心跳间隔与超时设置过近
- 未处理onLoseLeadership事件
Related Skills
相关技能
- Distributed Lock
- Background Jobs
- Graceful Shutdown
- 分布式锁
- 后台任务
- 优雅停机