server-tick

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Server-Authoritative Tick System

服务器权威Tick系统

Server-side tick system with lag compensation for fair multiplayer gameplay.
具备延迟补偿功能的服务器端Tick系统,为多人游戏提供公平的游戏体验。

When to Use This Skill

适用场景

  • Building real-time multiplayer games
  • Need to prevent cheating (speed hacks, teleports, aimbots)
  • Want fair hit detection despite network latency
  • Require deterministic physics simulation
  • 开发实时多人游戏
  • 需要防止作弊行为(加速外挂、瞬移、自瞄)
  • 希望在网络延迟下实现公平的命中检测
  • 需要确定性物理模拟

Core Concepts

核心概念

Client-authoritative multiplayer is trivially exploitable. Server-authoritative feels laggy without lag compensation. The solution:
  1. Fixed tick rate (60Hz) for deterministic physics
  2. Input validation with violation tracking and decay
  3. Position history for lag compensation (200ms window)
  4. Reduced broadcast rate (20Hz) to save bandwidth
客户端权威的多人游戏极易被利用。而无延迟补偿的服务器权威系统会让玩家感到延迟过高。解决方案如下:
  1. 固定Tick速率(60Hz)以实现确定性物理模拟
  2. 输入验证,包含违规记录与衰减机制
  3. 位置历史记录用于延迟补偿(200ms窗口)
  4. 降低广播速率(20Hz)以节省带宽

Implementation

实现方案

Python

Python实现

python
from dataclasses import dataclass, field
from collections import deque
from typing import Dict, Optional, Tuple, Callable, Awaitable
from enum import Enum
import asyncio
import time
import math


class ViolationType(str, Enum):
    SPEED_HACK = "speed_hack"
    TELEPORT = "teleport"
    INVALID_ACTION = "invalid_action"


@dataclass(frozen=True)
class TickConfig:
    rate_hz: int = 60
    broadcast_divisor: int = 3  # 60/3 = 20Hz broadcast
    max_speed: float = 300.0  # Units per second
    teleport_threshold: float = 100.0
    max_rewind_ms: int = 200
    violation_threshold: int = 10
    decay_per_second: float = 1.0
    
    @property
    def interval_ms(self) -> float:
        return 1000.0 / self.rate_hz
    
    def max_distance_per_tick(self) -> float:
        return self.max_speed / self.rate_hz


@dataclass
class PlayerInput:
    player_id: str
    tick: int
    dx: float = 0.0
    dy: float = 0.0
    timestamp_ms: float = 0.0


@dataclass
class PositionSnapshot:
    x: float
    y: float
    tick: int
    timestamp_ms: float


@dataclass
class PlayerState:
    player_id: str
    x: float = 0.0
    y: float = 0.0
    health: int = 100
    last_valid_position: Tuple[float, float] = (0.0, 0.0)
    violations: float = 0.0
    position_history: deque = field(default_factory=lambda: deque(maxlen=15))
    
    def record_position(self, tick: int, timestamp_ms: float) -> None:
        self.position_history.append(PositionSnapshot(
            x=self.x, y=self.y, tick=tick, timestamp_ms=timestamp_ms
        ))


@dataclass
class GameState:
    game_id: str
    tick: int = 0
    players: Dict[str, PlayerState] = field(default_factory=dict)
    running: bool = False
    start_time_ms: float = 0.0


class InputValidator:
    """Validates player inputs for anti-cheat."""
    
    def __init__(self, config: TickConfig):
        self._config = config
    
    def validate_movement(
        self, player: PlayerState, input: PlayerInput
    ) -> Tuple[bool, Optional[ViolationType]]:
        distance = math.sqrt(input.dx ** 2 + input.dy ** 2)
        max_distance = self._config.max_distance_per_tick()
        
        # 50% tolerance for network jitter
        if distance > max_distance * 1.5:
            return False, ViolationType.SPEED_HACK
        
        # Check for teleport
        new_x = player.x + input.dx
        new_y = player.y + input.dy
        last_x, last_y = player.last_valid_position
        jump_distance = math.sqrt((new_x - last_x) ** 2 + (new_y - last_y) ** 2)
        
        if jump_distance > self._config.teleport_threshold:
            return False, ViolationType.TELEPORT
        
        return True, None
    
    def apply_violation(self, player: PlayerState) -> Tuple[bool, bool]:
        player.violations += 1.0
        should_warn = player.violations >= self._config.violation_threshold * 0.5
        should_kick = player.violations >= self._config.violation_threshold
        return should_warn, should_kick
    
    def decay_violations(self, player: PlayerState, delta_seconds: float) -> None:
        decay = self._config.decay_per_second * delta_seconds
        player.violations = max(0.0, player.violations - decay)


class LagCompensator:
    """Lag compensation for fair hit detection."""
    
    def __init__(self, config: TickConfig):
        self._config = config
    
    def get_position_at_time(
        self, player: PlayerState, target_time_ms: float, current_time_ms: float
    ) -> Tuple[float, float]:
        # Clamp rewind to max window
        rewind_ms = current_time_ms - target_time_ms
        if rewind_ms > self._config.max_rewind_ms:
            target_time_ms = current_time_ms - self._config.max_rewind_ms
        
        if not player.position_history:
            return player.x, player.y
        
        # Find surrounding snapshots
        before: Optional[PositionSnapshot] = None
        after: Optional[PositionSnapshot] = None
        
        for snapshot in player.position_history:
            if snapshot.timestamp_ms <= target_time_ms:
                before = snapshot
            elif after is None:
                after = snapshot
                break
        
        if before is None:
            oldest = player.position_history[0]
            return oldest.x, oldest.y
        
        if after is None:
            return before.x, before.y
        
        # Interpolate between snapshots
        time_range = after.timestamp_ms - before.timestamp_ms
        if time_range <= 0:
            return before.x, before.y
        
        t = max(0.0, min(1.0, (target_time_ms - before.timestamp_ms) / time_range))
        x = before.x + (after.x - before.x) * t
        y = before.y + (after.y - before.y) * t
        
        return x, y
    
    def check_hit(
        self, shooter: PlayerState, target: PlayerState,
        shot_time_ms: float, current_time_ms: float, hit_radius: float = 20.0
    ) -> Tuple[bool, Tuple[float, float]]:
        target_x, target_y = self.get_position_at_time(target, shot_time_ms, current_time_ms)
        distance = math.sqrt((shooter.x - target_x) ** 2 + (shooter.y - target_y) ** 2)
        return distance <= hit_radius, (target_x, target_y)


class TickSystem:
    """Server-authoritative tick system."""
    
    def __init__(self, config: TickConfig = None):
        self._config = config or TickConfig()
        self._games: Dict[str, GameState] = {}
        self._tasks: Dict[str, asyncio.Task] = {}
        self._validator = InputValidator(self._config)
        self._lag_comp = LagCompensator(self._config)
        self._broadcast_callback: Optional[Callable[[str, dict], Awaitable[None]]] = None
        self._kick_callback: Optional[Callable[[str, str, str], Awaitable[None]]] = None
    
    def set_broadcast_callback(self, callback: Callable[[str, dict], Awaitable[None]]) -> None:
        self._broadcast_callback = callback
    
    def set_kick_callback(self, callback: Callable[[str, str, str], Awaitable[None]]) -> None:
        self._kick_callback = callback
    
    def create_game(
        self, game_id: str, player1_id: str, player2_id: str,
        spawn1: Tuple[float, float] = (100, 300),
        spawn2: Tuple[float, float] = (700, 300)
    ) -> GameState:
        game = GameState(game_id=game_id)
        game.players[player1_id] = PlayerState(
            player_id=player1_id, x=spawn1[0], y=spawn1[1], last_valid_position=spawn1
        )
        game.players[player2_id] = PlayerState(
            player_id=player2_id, x=spawn2[0], y=spawn2[1], last_valid_position=spawn2
        )
        self._games[game_id] = game
        return game
    
    async def start_game(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if not game:
            raise ValueError(f"Game {game_id} not found")
        
        game.running = True
        game.start_time_ms = time.time() * 1000
        self._tasks[game_id] = asyncio.create_task(self._tick_loop(game_id))
    
    async def stop_game(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if game:
            game.running = False
        
        task = self._tasks.pop(game_id, None)
        if task:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
        
        self._games.pop(game_id, None)
    
    async def process_input(self, game_id: str, input: PlayerInput) -> bool:
        game = self._games.get(game_id)
        if not game or not game.running:
            return False
        
        player = game.players.get(input.player_id)
        if not player:
            return False
        
        valid, violation = self._validator.validate_movement(player, input)
        
        if not valid and violation:
            _, should_kick = self._validator.apply_violation(player)
            if should_kick and self._kick_callback:
                await self._kick_callback(game_id, input.player_id, f"Anti-cheat: {violation.value}")
            return False
        
        player.x += input.dx
        player.y += input.dy
        player.last_valid_position = (player.x, player.y)
        return True
    
    async def _tick_loop(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if not game:
            return
        
        interval = self._config.interval_ms / 1000.0
        last_time = time.time()
        
        while game.running:
            tick_start = time.time()
            delta = tick_start - last_time
            last_time = tick_start
            
            await self._tick(game, delta)
            
            if game.tick % self._config.broadcast_divisor == 0:
                await self._broadcast_state(game)
            
            game.tick += 1
            
            elapsed = time.time() - tick_start
            sleep_time = max(0, interval - elapsed)
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
    
    async def _tick(self, game: GameState, delta: float) -> None:
        current_time_ms = time.time() * 1000
        for player in game.players.values():
            self._validator.decay_violations(player, delta)
            player.record_position(game.tick, current_time_ms)
    
    async def _broadcast_state(self, game: GameState) -> None:
        if not self._broadcast_callback:
            return
        
        state = {
            "type": "game_state",
            "tick": game.tick,
            "players": {
                pid: {"x": p.x, "y": p.y, "health": p.health}
                for pid, p in game.players.items()
            }
        }
        await self._broadcast_callback(game.game_id, state)
python
from dataclasses import dataclass, field
from collections import deque
from typing import Dict, Optional, Tuple, Callable, Awaitable
from enum import Enum
import asyncio
import time
import math


class ViolationType(str, Enum):
    SPEED_HACK = "speed_hack"
    TELEPORT = "teleport"
    INVALID_ACTION = "invalid_action"


@dataclass(frozen=True)
class TickConfig:
    rate_hz: int = 60
    broadcast_divisor: int = 3  # 60/3 = 20Hz broadcast
    max_speed: float = 300.0  # Units per second
    teleport_threshold: float = 100.0
    max_rewind_ms: int = 200
    violation_threshold: int = 10
    decay_per_second: float = 1.0
    
    @property
    def interval_ms(self) -> float:
        return 1000.0 / self.rate_hz
    
    def max_distance_per_tick(self) -> float:
        return self.max_speed / self.rate_hz


@dataclass
class PlayerInput:
    player_id: str
    tick: int
    dx: float = 0.0
    dy: float = 0.0
    timestamp_ms: float = 0.0


@dataclass
class PositionSnapshot:
    x: float
    y: float
    tick: int
    timestamp_ms: float


@dataclass
class PlayerState:
    player_id: str
    x: float = 0.0
    y: float = 0.0
    health: int = 100
    last_valid_position: Tuple[float, float] = (0.0, 0.0)
    violations: float = 0.0
    position_history: deque = field(default_factory=lambda: deque(maxlen=15))
    
    def record_position(self, tick: int, timestamp_ms: float) -> None:
        self.position_history.append(PositionSnapshot(
            x=self.x, y=self.y, tick=tick, timestamp_ms=timestamp_ms
        ))


@dataclass
class GameState:
    game_id: str
    tick: int = 0
    players: Dict[str, PlayerState] = field(default_factory=dict)
    running: bool = False
    start_time_ms: float = 0.0


class InputValidator:
    """Validates player inputs for anti-cheat."""
    
    def __init__(self, config: TickConfig):
        self._config = config
    
    def validate_movement(
        self, player: PlayerState, input: PlayerInput
    ) -> Tuple[bool, Optional[ViolationType]]:
        distance = math.sqrt(input.dx ** 2 + input.dy ** 2)
        max_distance = self._config.max_distance_per_tick()
        
        # 50% tolerance for network jitter
        if distance > max_distance * 1.5:
            return False, ViolationType.SPEED_HACK
        
        # Check for teleport
        new_x = player.x + input.dx
        new_y = player.y + input.dy
        last_x, last_y = player.last_valid_position
        jump_distance = math.sqrt((new_x - last_x) ** 2 + (new_y - last_y) ** 2)
        
        if jump_distance > self._config.teleport_threshold:
            return False, ViolationType.TELEPORT
        
        return True, None
    
    def apply_violation(self, player: PlayerState) -> Tuple[bool, bool]:
        player.violations += 1.0
        should_warn = player.violations >= self._config.violation_threshold * 0.5
        should_kick = player.violations >= self._config.violation_threshold
        return should_warn, should_kick
    
    def decay_violations(self, player: PlayerState, delta_seconds: float) -> None:
        decay = self._config.decay_per_second * delta_seconds
        player.violations = max(0.0, player.violations - decay)


class LagCompensator:
    """Lag compensation for fair hit detection."""
    
    def __init__(self, config: TickConfig):
        self._config = config
    
    def get_position_at_time(
        self, player: PlayerState, target_time_ms: float, current_time_ms: float
    ) -> Tuple[float, float]:
        # Clamp rewind to max window
        rewind_ms = current_time_ms - target_time_ms
        if rewind_ms > self._config.max_rewind_ms:
            target_time_ms = current_time_ms - self._config.max_rewind_ms
        
        if not player.position_history:
            return player.x, player.y
        
        # Find surrounding snapshots
        before: Optional[PositionSnapshot] = None
        after: Optional[PositionSnapshot] = None
        
        for snapshot in player.position_history:
            if snapshot.timestamp_ms <= target_time_ms:
                before = snapshot
            elif after is None:
                after = snapshot
                break
        
        if before is None:
            oldest = player.position_history[0]
            return oldest.x, oldest.y
        
        if after is None:
            return before.x, before.y
        
        # Interpolate between snapshots
        time_range = after.timestamp_ms - before.timestamp_ms
        if time_range <= 0:
            return before.x, before.y
        
        t = max(0.0, min(1.0, (target_time_ms - before.timestamp_ms) / time_range))
        x = before.x + (after.x - before.x) * t
        y = before.y + (after.y - before.y) * t
        
        return x, y
    
    def check_hit(
        self, shooter: PlayerState, target: PlayerState,
        shot_time_ms: float, current_time_ms: float, hit_radius: float = 20.0
    ) -> Tuple[bool, Tuple[float, float]]:
        target_x, target_y = self.get_position_at_time(target, shot_time_ms, current_time_ms)
        distance = math.sqrt((shooter.x - target_x) ** 2 + (shooter.y - target_y) ** 2)
        return distance <= hit_radius, (target_x, target_y)


class TickSystem:
    """Server-authoritative tick system."""
    
    def __init__(self, config: TickConfig = None):
        self._config = config or TickConfig()
        self._games: Dict[str, GameState] = {}
        self._tasks: Dict[str, asyncio.Task] = {}
        self._validator = InputValidator(self._config)
        self._lag_comp = LagCompensator(self._config)
        self._broadcast_callback: Optional[Callable[[str, dict], Awaitable[None]]] = None
        self._kick_callback: Optional[Callable[[str, str, str], Awaitable[None]]] = None
    
    def set_broadcast_callback(self, callback: Callable[[str, dict], Awaitable[None]]) -> None:
        self._broadcast_callback = callback
    
    def set_kick_callback(self, callback: Callable[[str, str, str], Awaitable[None]]) -> None:
        self._kick_callback = callback
    
    def create_game(
        self, game_id: str, player1_id: str, player2_id: str,
        spawn1: Tuple[float, float] = (100, 300),
        spawn2: Tuple[float, float] = (700, 300)
    ) -> GameState:
        game = GameState(game_id=game_id)
        game.players[player1_id] = PlayerState(
            player_id=player1_id, x=spawn1[0], y=spawn1[1], last_valid_position=spawn1
        )
        game.players[player2_id] = PlayerState(
            player_id=player2_id, x=spawn2[0], y=spawn2[1], last_valid_position=spawn2
        )
        self._games[game_id] = game
        return game
    
    async def start_game(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if not game:
            raise ValueError(f"Game {game_id} not found")
        
        game.running = True
        game.start_time_ms = time.time() * 1000
        self._tasks[game_id] = asyncio.create_task(self._tick_loop(game_id))
    
    async def stop_game(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if game:
            game.running = False
        
        task = self._tasks.pop(game_id, None)
        if task:
            task.cancel()
            try:
                await task
            except asyncio.CancelledError:
                pass
        
        self._games.pop(game_id, None)
    
    async def process_input(self, game_id: str, input: PlayerInput) -> bool:
        game = self._games.get(game_id)
        if not game or not game.running:
            return False
        
        player = game.players.get(input.player_id)
        if not player:
            return False
        
        valid, violation = self._validator.validate_movement(player, input)
        
        if not valid and violation:
            _, should_kick = self._validator.apply_violation(player)
            if should_kick and self._kick_callback:
                await self._kick_callback(game_id, input.player_id, f"Anti-cheat: {violation.value}")
            return False
        
        player.x += input.dx
        player.y += input.dy
        player.last_valid_position = (player.x, player.y)
        return True
    
    async def _tick_loop(self, game_id: str) -> None:
        game = self._games.get(game_id)
        if not game:
            return
        
        interval = self._config.interval_ms / 1000.0
        last_time = time.time()
        
        while game.running:
            tick_start = time.time()
            delta = tick_start - last_time
            last_time = tick_start
            
            await self._tick(game, delta)
            
            if game.tick % self._config.broadcast_divisor == 0:
                await self._broadcast_state(game)
            
            game.tick += 1
            
            elapsed = time.time() - tick_start
            sleep_time = max(0, interval - elapsed)
            if sleep_time > 0:
                await asyncio.sleep(sleep_time)
    
    async def _tick(self, game: GameState, delta: float) -> None:
        current_time_ms = time.time() * 1000
        for player in game.players.values():
            self._validator.decay_violations(player, delta)
            player.record_position(game.tick, current_time_ms)
    
    async def _broadcast_state(self, game: GameState) -> None:
        if not self._broadcast_callback:
            return
        
        state = {
            "type": "game_state",
            "tick": game.tick,
            "players": {
                pid: {"x": p.x, "y": p.y, "health": p.health}
                for pid, p in game.players.items()
            }
        }
        await self._broadcast_callback(game.game_id, state)

TypeScript

TypeScript实现

typescript
enum ViolationType {
  SPEED_HACK = 'speed_hack',
  TELEPORT = 'teleport',
  INVALID_ACTION = 'invalid_action',
}

interface TickConfig {
  rateHz: number;
  broadcastDivisor: number;
  maxSpeed: number;
  teleportThreshold: number;
  maxRewindMs: number;
  violationThreshold: number;
  decayPerSecond: number;
}

const DEFAULT_CONFIG: TickConfig = {
  rateHz: 60,
  broadcastDivisor: 3,
  maxSpeed: 300,
  teleportThreshold: 100,
  maxRewindMs: 200,
  violationThreshold: 10,
  decayPerSecond: 1.0,
};

interface PositionSnapshot {
  x: number;
  y: number;
  tick: number;
  timestampMs: number;
}

interface PlayerState {
  playerId: string;
  x: number;
  y: number;
  health: number;
  lastValidPosition: [number, number];
  violations: number;
  positionHistory: PositionSnapshot[];
}

class TickSystem {
  private config: TickConfig;
  private games = new Map<string, GameState>();
  private intervals = new Map<string, NodeJS.Timeout>();
  
  constructor(config: Partial<TickConfig> = {}) {
    this.config = { ...DEFAULT_CONFIG, ...config };
  }
  
  createGame(gameId: string, player1Id: string, player2Id: string): GameState {
    const game: GameState = {
      gameId,
      tick: 0,
      players: new Map([
        [player1Id, this.createPlayer(player1Id, 100, 300)],
        [player2Id, this.createPlayer(player2Id, 700, 300)],
      ]),
      running: false,
    };
    this.games.set(gameId, game);
    return game;
  }
  
  private createPlayer(id: string, x: number, y: number): PlayerState {
    return {
      playerId: id,
      x, y,
      health: 100,
      lastValidPosition: [x, y],
      violations: 0,
      positionHistory: [],
    };
  }
  
  processInput(gameId: string, input: PlayerInput): boolean {
    const game = this.games.get(gameId);
    if (!game?.running) return false;
    
    const player = game.players.get(input.playerId);
    if (!player) return false;
    
    const distance = Math.sqrt(input.dx ** 2 + input.dy ** 2);
    const maxDistance = this.config.maxSpeed / this.config.rateHz;
    
    if (distance > maxDistance * 1.5) {
      player.violations += 1;
      return false;
    }
    
    player.x += input.dx;
    player.y += input.dy;
    player.lastValidPosition = [player.x, player.y];
    return true;
  }
  
  getPositionAtTime(player: PlayerState, targetTimeMs: number, currentTimeMs: number): [number, number] {
    const rewindMs = Math.min(currentTimeMs - targetTimeMs, this.config.maxRewindMs);
    const clampedTargetTime = currentTimeMs - rewindMs;
    
    if (player.positionHistory.length === 0) {
      return [player.x, player.y];
    }
    
    let before: PositionSnapshot | null = null;
    let after: PositionSnapshot | null = null;
    
    for (const snapshot of player.positionHistory) {
      if (snapshot.timestampMs <= clampedTargetTime) {
        before = snapshot;
      } else if (!after) {
        after = snapshot;
        break;
      }
    }
    
    if (!before) return [player.positionHistory[0].x, player.positionHistory[0].y];
    if (!after) return [before.x, before.y];
    
    const t = Math.max(0, Math.min(1, 
      (clampedTargetTime - before.timestampMs) / (after.timestampMs - before.timestampMs)
    ));
    
    return [
      before.x + (after.x - before.x) * t,
      before.y + (after.y - before.y) * t,
    ];
  }
}
typescript
enum ViolationType {
  SPEED_HACK = 'speed_hack',
  TELEPORT = 'teleport',
  INVALID_ACTION = 'invalid_action',
}

interface TickConfig {
  rateHz: number;
  broadcastDivisor: number;
  maxSpeed: number;
  teleportThreshold: number;
  maxRewindMs: number;
  violationThreshold: number;
  decayPerSecond: number;
}

const DEFAULT_CONFIG: TickConfig = {
  rateHz: 60,
  broadcastDivisor: 3,
  maxSpeed: 300,
  teleportThreshold: 100,
  maxRewindMs: 200,
  violationThreshold: 10,
  decayPerSecond: 1.0,
};

interface PositionSnapshot {
  x: number;
  y: number;
  tick: number;
  timestampMs: number;
}

interface PlayerState {
  playerId: string;
  x: number;
  y: number;
  health: number;
  lastValidPosition: [number, number];
  violations: number;
  positionHistory: PositionSnapshot[];
}

class TickSystem {
  private config: TickConfig;
  private games = new Map<string, GameState>();
  private intervals = new Map<string, NodeJS.Timeout>();
  
  constructor(config: Partial<TickConfig> = {}) {
    this.config = { ...DEFAULT_CONFIG, ...config };
  }
  
  createGame(gameId: string, player1Id: string, player2Id: string): GameState {
    const game: GameState = {
      gameId,
      tick: 0,
      players: new Map([
        [player1Id, this.createPlayer(player1Id, 100, 300)],
        [player2Id, this.createPlayer(player2Id, 700, 300)],
      ]),
      running: false,
    };
    this.games.set(gameId, game);
    return game;
  }
  
  private createPlayer(id: string, x: number, y: number): PlayerState {
    return {
      playerId: id,
      x, y,
      health: 100,
      lastValidPosition: [x, y],
      violations: 0,
      positionHistory: [],
    };
  }
  
  processInput(gameId: string, input: PlayerInput): boolean {
    const game = this.games.get(gameId);
    if (!game?.running) return false;
    
    const player = game.players.get(input.playerId);
    if (!player) return false;
    
    const distance = Math.sqrt(input.dx ** 2 + input.dy ** 2);
    const maxDistance = this.config.maxSpeed / this.config.rateHz;
    
    if (distance > maxDistance * 1.5) {
      player.violations += 1;
      return false;
    }
    
    player.x += input.dx;
    player.y += input.dy;
    player.lastValidPosition = [player.x, player.y];
    return true;
  }
  
  getPositionAtTime(player: PlayerState, targetTimeMs: number, currentTimeMs: number): [number, number] {
    const rewindMs = Math.min(currentTimeMs - targetTimeMs, this.config.maxRewindMs);
    const clampedTargetTime = currentTimeMs - rewindMs;
    
    if (player.positionHistory.length === 0) {
      return [player.x, player.y];
    }
    
    let before: PositionSnapshot | null = null;
    let after: PositionSnapshot | null = null;
    
    for (const snapshot of player.positionHistory) {
      if (snapshot.timestampMs <= clampedTargetTime) {
        before = snapshot;
      } else if (!after) {
        after = snapshot;
        break;
      }
    }
    
    if (!before) return [player.positionHistory[0].x, player.positionHistory[0].y];
    if (!after) return [before.x, before.y];
    
    const t = Math.max(0, Math.min(1, 
      (clampedTargetTime - before.timestampMs) / (after.timestampMs - before.timestampMs)
    ));
    
    return [
      before.x + (after.x - before.x) * t,
      before.y + (after.y - before.y) * t,
    ];
  }
}

Usage Examples

使用示例

Game Setup

游戏设置

python
tick_system = TickSystem()
python
tick_system = TickSystem()

Create game with two players

Create game with two players

game = tick_system.create_game("match-123", "player-1", "player-2")
game = tick_system.create_game("match-123", "player-1", "player-2")

Set up callbacks

Set up callbacks

tick_system.set_broadcast_callback(broadcast_to_players) tick_system.set_kick_callback(kick_player)
tick_system.set_broadcast_callback(broadcast_to_players) tick_system.set_kick_callback(kick_player)

Start the game loop

Start the game loop

await tick_system.start_game("match-123")
undefined
await tick_system.start_game("match-123")
undefined

Processing Player Input

处理玩家输入

python
undefined
python
undefined

When receiving input from client

When receiving input from client

input = PlayerInput( player_id="player-1", tick=game.tick, dx=5.0, dy=2.0, timestamp_ms=client_timestamp )
valid = await tick_system.process_input("match-123", input) if not valid: # Input was rejected (possible cheat attempt) pass
undefined
input = PlayerInput( player_id="player-1", tick=game.tick, dx=5.0, dy=2.0, timestamp_ms=client_timestamp )
valid = await tick_system.process_input("match-123", input) if not valid: # Input was rejected (possible cheat attempt) pass
undefined

Lag-Compensated Hit Detection

延迟补偿命中检测

python
undefined
python
undefined

When player fires a shot

When player fires a shot

shooter = game.players["player-1"] target = game.players["player-2"]
hit, target_pos = tick_system._lag_comp.check_hit( shooter=shooter, target=target, shot_time_ms=client_shot_timestamp, current_time_ms=time.time() * 1000, hit_radius=20.0 )
if hit: target.health -= 25
undefined
shooter = game.players["player-1"] target = game.players["player-2"]
hit, target_pos = tick_system._lag_comp.check_hit( shooter=shooter, target=target, shot_time_ms=client_shot_timestamp, current_time_ms=time.time() * 1000, hit_radius=20.0 )
if hit: target.health -= 25
undefined

Best Practices

最佳实践

  1. Tune tick rate for your game - 30-60Hz is typical for action games
  2. Set broadcast rate lower than tick rate to save bandwidth
  3. Keep lag compensation window reasonable (100-200ms)
  4. Use violation decay to forgive occasional network hiccups
  5. Log all kicks with full context for review
  6. Test with simulated high latency
  1. 根据游戏类型调整Tick速率——动作游戏通常为30-60Hz
  2. 设置低于Tick速率的广播速率以节省带宽
  3. 保持合理的延迟补偿窗口(100-200ms)
  4. 使用违规衰减机制,原谅偶尔的网络波动
  5. 记录所有踢人操作的完整上下文以便复盘
  6. 模拟高延迟环境进行测试

Common Mistakes

常见误区

  • Running physics at variable rate (causes non-determinism)
  • No tolerance for network jitter (false positives)
  • Lag compensation window too large (feels unfair to targets)
  • Not decaying violations (kicks legitimate players)
  • Broadcasting every tick (wastes bandwidth)
  • Trusting client timestamps without bounds checking
  • 以可变速率运行物理模拟(会导致非确定性)
  • 对网络波动无容错机制(误判率高)
  • 延迟补偿窗口过大(对目标玩家不公平)
  • 未设置违规衰减机制(误踢正常玩家)
  • 每次Tick都进行广播(浪费带宽)
  • 无边界检查就信任客户端时间戳

Related Patterns

相关模式

  • websocket-management - Connection handling for game clients
  • atomic-matchmaking - Creating matches before game starts
  • graceful-shutdown - Draining games on server shutdown
  • websocket-management - 游戏客户端的连接管理
  • atomic-matchmaking - 游戏开始前的匹配创建
  • graceful-shutdown - 服务器关闭时的游戏收尾处理