distributed-lock
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseDistributed Locking
分布式锁
Prevent race conditions across multiple instances.
防止多实例间的竞争条件。
When to Use This Skill
适用场景
- Multiple instances processing the same job
- Need to prevent duplicate operations
- Singleton cron jobs across instances
- Critical sections in distributed systems
- 多实例处理同一任务
- 需要防止重复操作
- 跨实例的单例定时任务
- 分布式系统中的临界区
Core Concepts
核心概念
- Atomic acquisition - Only one holder at a time
- TTL expiration - Prevents deadlocks if holder crashes
- Lock extension - Renew for long-running operations
- Holder ID - Track who owns the lock
- 原子性获取 - 同一时间仅一个持有者
- TTL过期 - 持有者崩溃时可避免死锁
- 锁续期 - 为长时间运行的操作续期
- 持有者ID - 跟踪锁的归属
TypeScript Implementation
TypeScript实现
Types
类型定义
typescript
// types.ts
export interface LockInfo {
lockName: string;
holderId: string;
acquiredAt: Date;
expiresAt: Date;
metadata?: Record<string, unknown>;
}
export interface LockOptions {
timeoutSeconds?: number;
blocking?: boolean;
blockingTimeoutSeconds?: number;
metadata?: Record<string, unknown>;
}
export interface LockResult {
acquired: boolean;
lock?: LockInfo;
error?: string;
}typescript
// types.ts
export interface LockInfo {
lockName: string;
holderId: string;
acquiredAt: Date;
expiresAt: Date;
metadata?: Record<string, unknown>;
}
export interface LockOptions {
timeoutSeconds?: number;
blocking?: boolean;
blockingTimeoutSeconds?: number;
metadata?: Record<string, unknown>;
}
export interface LockResult {
acquired: boolean;
lock?: LockInfo;
error?: string;
}In-Memory Lock (Single Instance)
内存锁(单实例)
typescript
// distributed-lock.ts
import { LockInfo, LockOptions, LockResult } from './types';
const lockStore = new Map<string, LockInfo>();
export class DistributedLock {
private holderId: string;
private heldLocks = new Map<string, LockInfo>();
constructor() {
this.holderId = `worker_${Date.now()}_${Math.random().toString(36).slice(2)}`;
}
async acquire(lockName: string, options: LockOptions = {}): Promise<LockResult> {
const { timeoutSeconds = 30, blocking = false, blockingTimeoutSeconds = 10 } = options;
const now = new Date();
const expiresAt = new Date(now.getTime() + timeoutSeconds * 1000);
// Clean expired locks
for (const [name, lock] of lockStore) {
if (lock.expiresAt <= now) lockStore.delete(name);
}
const existing = lockStore.get(lockName);
if (existing && existing.expiresAt > now) {
if (existing.holderId === this.holderId) {
// Extend our own lock
existing.expiresAt = expiresAt;
return { acquired: true, lock: existing };
}
if (blocking) {
return this.waitForLock(lockName, options);
}
return { acquired: false, error: `Lock held by ${existing.holderId}` };
}
const lock: LockInfo = {
lockName,
holderId: this.holderId,
acquiredAt: now,
expiresAt,
metadata: options.metadata,
};
lockStore.set(lockName, lock);
this.heldLocks.set(lockName, lock);
return { acquired: true, lock };
}
async release(lockName: string): Promise<boolean> {
const lock = lockStore.get(lockName);
if (!lock || lock.holderId !== this.holderId) return false;
lockStore.delete(lockName);
this.heldLocks.delete(lockName);
return true;
}
async extend(lockName: string, additionalSeconds: number): Promise<boolean> {
const lock = lockStore.get(lockName);
if (!lock || lock.holderId !== this.holderId) return false;
lock.expiresAt = new Date(Date.now() + additionalSeconds * 1000);
return true;
}
async withLock<T>(
lockName: string,
fn: () => Promise<T>,
options: LockOptions = {}
): Promise<T> {
const result = await this.acquire(lockName, options);
if (!result.acquired) {
throw new Error(`Failed to acquire lock: ${lockName}`);
}
try {
return await fn();
} finally {
await this.release(lockName);
}
}
private async waitForLock(lockName: string, options: LockOptions): Promise<LockResult> {
const timeout = (options.blockingTimeoutSeconds || 10) * 1000;
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
await new Promise(r => setTimeout(r, 100));
const result = await this.acquire(lockName, { ...options, blocking: false });
if (result.acquired) return result;
}
return { acquired: false, error: 'Timeout waiting for lock' };
}
async releaseAll(): Promise<void> {
for (const lockName of this.heldLocks.keys()) {
await this.release(lockName);
}
}
}
// Singleton
let instance: DistributedLock | null = null;
export function getDistributedLock(): DistributedLock {
if (!instance) instance = new DistributedLock();
return instance;
}typescript
// distributed-lock.ts
import { LockInfo, LockOptions, LockResult } from './types';
const lockStore = new Map<string, LockInfo>();
export class DistributedLock {
private holderId: string;
private heldLocks = new Map<string, LockInfo>();
constructor() {
this.holderId = `worker_${Date.now()}_${Math.random().toString(36).slice(2)}`;
}
async acquire(lockName: string, options: LockOptions = {}): Promise<LockResult> {
const { timeoutSeconds = 30, blocking = false, blockingTimeoutSeconds = 10 } = options;
const now = new Date();
const expiresAt = new Date(now.getTime() + timeoutSeconds * 1000);
// Clean expired locks
for (const [name, lock] of lockStore) {
if (lock.expiresAt <= now) lockStore.delete(name);
}
const existing = lockStore.get(lockName);
if (existing && existing.expiresAt > now) {
if (existing.holderId === this.holderId) {
// Extend our own lock
existing.expiresAt = expiresAt;
return { acquired: true, lock: existing };
}
if (blocking) {
return this.waitForLock(lockName, options);
}
return { acquired: false, error: `Lock held by ${existing.holderId}` };
}
const lock: LockInfo = {
lockName,
holderId: this.holderId,
acquiredAt: now,
expiresAt,
metadata: options.metadata,
};
lockStore.set(lockName, lock);
this.heldLocks.set(lockName, lock);
return { acquired: true, lock };
}
async release(lockName: string): Promise<boolean> {
const lock = lockStore.get(lockName);
if (!lock || lock.holderId !== this.holderId) return false;
lockStore.delete(lockName);
this.heldLocks.delete(lockName);
return true;
}
async extend(lockName: string, additionalSeconds: number): Promise<boolean> {
const lock = lockStore.get(lockName);
if (!lock || lock.holderId !== this.holderId) return false;
lock.expiresAt = new Date(Date.now() + additionalSeconds * 1000);
return true;
}
async withLock<T>(
lockName: string,
fn: () => Promise<T>,
options: LockOptions = {}
): Promise<T> {
const result = await this.acquire(lockName, options);
if (!result.acquired) {
throw new Error(`Failed to acquire lock: ${lockName}`);
}
try {
return await fn();
} finally {
await this.release(lockName);
}
}
private async waitForLock(lockName: string, options: LockOptions): Promise<LockResult> {
const timeout = (options.blockingTimeoutSeconds || 10) * 1000;
const startTime = Date.now();
while (Date.now() - startTime < timeout) {
await new Promise(r => setTimeout(r, 100));
const result = await this.acquire(lockName, { ...options, blocking: false });
if (result.acquired) return result;
}
return { acquired: false, error: 'Timeout waiting for lock' };
}
async releaseAll(): Promise<void> {
for (const lockName of this.heldLocks.keys()) {
await this.release(lockName);
}
}
}
// Singleton
let instance: DistributedLock | null = null;
export function getDistributedLock(): DistributedLock {
if (!instance) instance = new DistributedLock();
return instance;
}PostgreSQL Lock (Multi-Instance)
PostgreSQL锁(多实例)
sql
-- migrations/distributed_locks.sql
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY,
holder_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_locks_expires ON distributed_locks(expires_at);
-- Atomic lock acquisition
CREATE OR REPLACE FUNCTION acquire_lock(
p_lock_name VARCHAR(255),
p_holder_id VARCHAR(255),
p_ttl_seconds INTEGER DEFAULT 30,
p_metadata JSONB DEFAULT '{}'
) RETURNS BOOLEAN AS $$
DECLARE
v_now TIMESTAMPTZ := NOW();
v_expires_at TIMESTAMPTZ := v_now + (p_ttl_seconds || ' seconds')::INTERVAL;
BEGIN
-- Clean expired
DELETE FROM distributed_locks
WHERE lock_name = p_lock_name AND expires_at < v_now;
-- Try to acquire
INSERT INTO distributed_locks (lock_name, holder_id, acquired_at, expires_at, metadata)
VALUES (p_lock_name, p_holder_id, v_now, v_expires_at, p_metadata)
ON CONFLICT (lock_name) DO UPDATE
SET holder_id = EXCLUDED.holder_id,
acquired_at = EXCLUDED.acquired_at,
expires_at = EXCLUDED.expires_at
WHERE distributed_locks.holder_id = p_holder_id
OR distributed_locks.expires_at < v_now;
RETURN EXISTS (
SELECT 1 FROM distributed_locks
WHERE lock_name = p_lock_name AND holder_id = p_holder_id
);
END;
$$ LANGUAGE plpgsql;
-- Release lock
CREATE OR REPLACE FUNCTION release_lock(
p_lock_name VARCHAR(255),
p_holder_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
DELETE FROM distributed_locks
WHERE lock_name = p_lock_name AND holder_id = p_holder_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;sql
-- migrations/distributed_locks.sql
CREATE TABLE distributed_locks (
lock_name VARCHAR(255) PRIMARY KEY,
holder_id VARCHAR(255) NOT NULL,
acquired_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
expires_at TIMESTAMPTZ NOT NULL,
metadata JSONB DEFAULT '{}'
);
CREATE INDEX idx_locks_expires ON distributed_locks(expires_at);
-- Atomic lock acquisition
CREATE OR REPLACE FUNCTION acquire_lock(
p_lock_name VARCHAR(255),
p_holder_id VARCHAR(255),
p_ttl_seconds INTEGER DEFAULT 30,
p_metadata JSONB DEFAULT '{}'
) RETURNS BOOLEAN AS $$
DECLARE
v_now TIMESTAMPTZ := NOW();
v_expires_at TIMESTAMPTZ := v_now + (p_ttl_seconds || ' seconds')::INTERVAL;
BEGIN
-- Clean expired
DELETE FROM distributed_locks
WHERE lock_name = p_lock_name AND expires_at < v_now;
-- Try to acquire
INSERT INTO distributed_locks (lock_name, holder_id, acquired_at, expires_at, metadata)
VALUES (p_lock_name, p_holder_id, v_now, v_expires_at, p_metadata)
ON CONFLICT (lock_name) DO UPDATE
SET holder_id = EXCLUDED.holder_id,
acquired_at = EXCLUDED.acquired_at,
expires_at = EXCLUDED.expires_at
WHERE distributed_locks.holder_id = p_holder_id
OR distributed_locks.expires_at < v_now;
RETURN EXISTS (
SELECT 1 FROM distributed_locks
WHERE lock_name = p_lock_name AND holder_id = p_holder_id
);
END;
$$ LANGUAGE plpgsql;
-- Release lock
CREATE OR REPLACE FUNCTION release_lock(
p_lock_name VARCHAR(255),
p_holder_id VARCHAR(255)
) RETURNS BOOLEAN AS $$
BEGIN
DELETE FROM distributed_locks
WHERE lock_name = p_lock_name AND holder_id = p_holder_id;
RETURN FOUND;
END;
$$ LANGUAGE plpgsql;Python Implementation
Python实现
python
undefinedpython
undefineddistributed_lock.py
distributed_lock.py
import time
import uuid
from dataclasses import dataclass
from typing import Optional, Callable, TypeVar
from contextlib import asynccontextmanager
T = TypeVar('T')
@dataclass
class LockInfo:
lock_name: str
holder_id: str
acquired_at: float
expires_at: float
class DistributedLock:
def init(self, db):
self.db = db
self.holder_id = f"worker_{uuid.uuid4().hex[:8]}"
async def acquire(
self,
lock_name: str,
timeout_seconds: int = 30
) -> bool:
result = await self.db.execute(
"SELECT acquire_lock($1, $2, $3)",
lock_name, self.holder_id, timeout_seconds
)
return result[0][0]
async def release(self, lock_name: str) -> bool:
result = await self.db.execute(
"SELECT release_lock($1, $2)",
lock_name, self.holder_id
)
return result[0][0]
@asynccontextmanager
async def lock(self, lock_name: str, timeout_seconds: int = 30):
acquired = await self.acquire(lock_name, timeout_seconds)
if not acquired:
raise Exception(f"Failed to acquire lock: {lock_name}")
try:
yield
finally:
await self.release(lock_name)import time
import uuid
from dataclasses import dataclass
from typing import Optional, Callable, TypeVar
from contextlib import asynccontextmanager
T = TypeVar('T')
@dataclass
class LockInfo:
lock_name: str
holder_id: str
acquired_at: float
expires_at: float
class DistributedLock:
def init(self, db):
self.db = db
self.holder_id = f"worker_{uuid.uuid4().hex[:8]}"
async def acquire(
self,
lock_name: str,
timeout_seconds: int = 30
) -> bool:
result = await self.db.execute(
"SELECT acquire_lock($1, $2, $3)",
lock_name, self.holder_id, timeout_seconds
)
return result[0][0]
async def release(self, lock_name: str) -> bool:
result = await self.db.execute(
"SELECT release_lock($1, $2)",
lock_name, self.holder_id
)
return result[0][0]
@asynccontextmanager
async def lock(self, lock_name: str, timeout_seconds: int = 30):
acquired = await self.acquire(lock_name, timeout_seconds)
if not acquired:
raise Exception(f"Failed to acquire lock: {lock_name}")
try:
yield
finally:
await self.release(lock_name)Usage
Usage
async with lock.lock("job:123"):
await process_job("123")
undefinedasync with lock.lock("job:123"):
await process_job("123")
undefinedUsage Examples
使用示例
Basic Lock
基础锁使用
typescript
const lock = getDistributedLock();
const result = await lock.acquire('process-payment:order-123');
if (result.acquired) {
try {
await processPayment('order-123');
} finally {
await lock.release('process-payment:order-123');
}
} else {
console.log('Another instance is processing this order');
}typescript
const lock = getDistributedLock();
const result = await lock.acquire('process-payment:order-123');
if (result.acquired) {
try {
await processPayment('order-123');
} finally {
await lock.release('process-payment:order-123');
}
} else {
console.log('Another instance is processing this order');
}With Context Manager
使用上下文管理器
typescript
await lock.withLock('daily-report', async () => {
await generateDailyReport();
}, { timeoutSeconds: 300 });typescript
await lock.withLock('daily-report', async () => {
await generateDailyReport();
}, { timeoutSeconds: 300 });Long-Running Job with Extension
带续期的长任务
typescript
async function processJob(jobId: string) {
const lock = getDistributedLock();
const result = await lock.acquire(`job:${jobId}`, { timeoutSeconds: 60 });
if (!result.acquired) return;
try {
// Extend lock periodically for long jobs
const extendInterval = setInterval(async () => {
await lock.extend(`job:${jobId}`, 60);
}, 30000);
await doExpensiveWork(jobId);
clearInterval(extendInterval);
} finally {
await lock.release(`job:${jobId}`);
}
}typescript
async function processJob(jobId: string) {
const lock = getDistributedLock();
const result = await lock.acquire(`job:${jobId}`, { timeoutSeconds: 60 });
if (!result.acquired) return;
try {
// Extend lock periodically for long jobs
const extendInterval = setInterval(async () => {
await lock.extend(`job:${jobId}`, 60);
}, 30000);
await doExpensiveWork(jobId);
clearInterval(extendInterval);
} finally {
await lock.release(`job:${jobId}`);
}
}Singleton Cron Job
单例定时任务
typescript
async function runScheduledTask() {
const lock = getDistributedLock();
const result = await lock.acquire('cron:daily-cleanup', {
timeoutSeconds: 3600,
});
if (!result.acquired) {
console.log('Another instance is running daily cleanup');
return;
}
try {
await performDailyCleanup();
} finally {
await lock.release('cron:daily-cleanup');
}
}typescript
async function runScheduledTask() {
const lock = getDistributedLock();
const result = await lock.acquire('cron:daily-cleanup', {
timeoutSeconds: 3600,
});
if (!result.acquired) {
console.log('Another instance is running daily cleanup');
return;
}
try {
await performDailyCleanup();
} finally {
await lock.release('cron:daily-cleanup');
}
}Lock Naming Conventions
锁命名规范
typescript
// Resource-based
`lock:user:${userId}:profile-update`
`lock:order:${orderId}:process`
// Job-based
`job:${jobType}:${jobId}`
// Singleton operations
`cron:${taskName}`
`singleton:cache-refresh`typescript
// Resource-based
`lock:user:${userId}:profile-update`
`lock:order:${orderId}:process`
// Job-based
`job:${jobType}:${jobId}`
// Singleton operations
`cron:${taskName}`
`singleton:cache-refresh`Best Practices
最佳实践
- Set appropriate TTL - Match operation duration
- Extend for long ops - Don't let locks expire mid-operation
- Unique holder IDs - Per instance, not per request
- Release in finally - Always release, even on error
- Use withLock - Automatic acquire/release
- 设置合适的TTL - 与操作时长匹配
- 为长任务续期 - 避免任务执行过程中锁过期
- 唯一持有者ID - 每个实例一个ID,而非每个请求
- 在finally中释放锁 - 即使出错也要确保释放
- 使用withLock - 自动完成锁的获取与释放
Common Mistakes
常见错误
- TTL too short for operation
- Not extending long-running locks
- Forgetting to release on error
- Same holder ID across instances
- Not handling acquisition failure
- TTL设置过短,不足以完成操作
- 不为长时间运行的任务续期
- 出错时忘记释放锁
- 多实例使用相同的持有者ID
- 未处理锁获取失败的情况
Related Skills
相关技能
- Background Jobs
- Circuit Breaker
- Idempotency
- 后台任务
- 断路器
- 幂等性