Loading...
Loading...
Server-authoritative tick system for multiplayer games with lag compensation, anti-cheat validation, and deterministic physics. Prevents speed hacks and teleports while keeping gameplay fair for high-latency players.
npx skill4agent add dadbodgeoff/drift server-tickfrom dataclasses import dataclass, field
from collections import deque
from typing import Dict, Optional, Tuple, Callable, Awaitable
from enum import Enum
import asyncio
import time
import math
class ViolationType(str, Enum):
SPEED_HACK = "speed_hack"
TELEPORT = "teleport"
INVALID_ACTION = "invalid_action"
@dataclass(frozen=True)
class TickConfig:
rate_hz: int = 60
broadcast_divisor: int = 3 # 60/3 = 20Hz broadcast
max_speed: float = 300.0 # Units per second
teleport_threshold: float = 100.0
max_rewind_ms: int = 200
violation_threshold: int = 10
decay_per_second: float = 1.0
@property
def interval_ms(self) -> float:
return 1000.0 / self.rate_hz
def max_distance_per_tick(self) -> float:
return self.max_speed / self.rate_hz
@dataclass
class PlayerInput:
player_id: str
tick: int
dx: float = 0.0
dy: float = 0.0
timestamp_ms: float = 0.0
@dataclass
class PositionSnapshot:
x: float
y: float
tick: int
timestamp_ms: float
@dataclass
class PlayerState:
player_id: str
x: float = 0.0
y: float = 0.0
health: int = 100
last_valid_position: Tuple[float, float] = (0.0, 0.0)
violations: float = 0.0
position_history: deque = field(default_factory=lambda: deque(maxlen=15))
def record_position(self, tick: int, timestamp_ms: float) -> None:
self.position_history.append(PositionSnapshot(
x=self.x, y=self.y, tick=tick, timestamp_ms=timestamp_ms
))
@dataclass
class GameState:
game_id: str
tick: int = 0
players: Dict[str, PlayerState] = field(default_factory=dict)
running: bool = False
start_time_ms: float = 0.0
class InputValidator:
"""Validates player inputs for anti-cheat."""
def __init__(self, config: TickConfig):
self._config = config
def validate_movement(
self, player: PlayerState, input: PlayerInput
) -> Tuple[bool, Optional[ViolationType]]:
distance = math.sqrt(input.dx ** 2 + input.dy ** 2)
max_distance = self._config.max_distance_per_tick()
# 50% tolerance for network jitter
if distance > max_distance * 1.5:
return False, ViolationType.SPEED_HACK
# Check for teleport
new_x = player.x + input.dx
new_y = player.y + input.dy
last_x, last_y = player.last_valid_position
jump_distance = math.sqrt((new_x - last_x) ** 2 + (new_y - last_y) ** 2)
if jump_distance > self._config.teleport_threshold:
return False, ViolationType.TELEPORT
return True, None
def apply_violation(self, player: PlayerState) -> Tuple[bool, bool]:
player.violations += 1.0
should_warn = player.violations >= self._config.violation_threshold * 0.5
should_kick = player.violations >= self._config.violation_threshold
return should_warn, should_kick
def decay_violations(self, player: PlayerState, delta_seconds: float) -> None:
decay = self._config.decay_per_second * delta_seconds
player.violations = max(0.0, player.violations - decay)
class LagCompensator:
"""Lag compensation for fair hit detection."""
def __init__(self, config: TickConfig):
self._config = config
def get_position_at_time(
self, player: PlayerState, target_time_ms: float, current_time_ms: float
) -> Tuple[float, float]:
# Clamp rewind to max window
rewind_ms = current_time_ms - target_time_ms
if rewind_ms > self._config.max_rewind_ms:
target_time_ms = current_time_ms - self._config.max_rewind_ms
if not player.position_history:
return player.x, player.y
# Find surrounding snapshots
before: Optional[PositionSnapshot] = None
after: Optional[PositionSnapshot] = None
for snapshot in player.position_history:
if snapshot.timestamp_ms <= target_time_ms:
before = snapshot
elif after is None:
after = snapshot
break
if before is None:
oldest = player.position_history[0]
return oldest.x, oldest.y
if after is None:
return before.x, before.y
# Interpolate between snapshots
time_range = after.timestamp_ms - before.timestamp_ms
if time_range <= 0:
return before.x, before.y
t = max(0.0, min(1.0, (target_time_ms - before.timestamp_ms) / time_range))
x = before.x + (after.x - before.x) * t
y = before.y + (after.y - before.y) * t
return x, y
def check_hit(
self, shooter: PlayerState, target: PlayerState,
shot_time_ms: float, current_time_ms: float, hit_radius: float = 20.0
) -> Tuple[bool, Tuple[float, float]]:
target_x, target_y = self.get_position_at_time(target, shot_time_ms, current_time_ms)
distance = math.sqrt((shooter.x - target_x) ** 2 + (shooter.y - target_y) ** 2)
return distance <= hit_radius, (target_x, target_y)
class TickSystem:
"""Server-authoritative tick system."""
def __init__(self, config: TickConfig = None):
self._config = config or TickConfig()
self._games: Dict[str, GameState] = {}
self._tasks: Dict[str, asyncio.Task] = {}
self._validator = InputValidator(self._config)
self._lag_comp = LagCompensator(self._config)
self._broadcast_callback: Optional[Callable[[str, dict], Awaitable[None]]] = None
self._kick_callback: Optional[Callable[[str, str, str], Awaitable[None]]] = None
def set_broadcast_callback(self, callback: Callable[[str, dict], Awaitable[None]]) -> None:
self._broadcast_callback = callback
def set_kick_callback(self, callback: Callable[[str, str, str], Awaitable[None]]) -> None:
self._kick_callback = callback
def create_game(
self, game_id: str, player1_id: str, player2_id: str,
spawn1: Tuple[float, float] = (100, 300),
spawn2: Tuple[float, float] = (700, 300)
) -> GameState:
game = GameState(game_id=game_id)
game.players[player1_id] = PlayerState(
player_id=player1_id, x=spawn1[0], y=spawn1[1], last_valid_position=spawn1
)
game.players[player2_id] = PlayerState(
player_id=player2_id, x=spawn2[0], y=spawn2[1], last_valid_position=spawn2
)
self._games[game_id] = game
return game
async def start_game(self, game_id: str) -> None:
game = self._games.get(game_id)
if not game:
raise ValueError(f"Game {game_id} not found")
game.running = True
game.start_time_ms = time.time() * 1000
self._tasks[game_id] = asyncio.create_task(self._tick_loop(game_id))
async def stop_game(self, game_id: str) -> None:
game = self._games.get(game_id)
if game:
game.running = False
task = self._tasks.pop(game_id, None)
if task:
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
self._games.pop(game_id, None)
async def process_input(self, game_id: str, input: PlayerInput) -> bool:
game = self._games.get(game_id)
if not game or not game.running:
return False
player = game.players.get(input.player_id)
if not player:
return False
valid, violation = self._validator.validate_movement(player, input)
if not valid and violation:
_, should_kick = self._validator.apply_violation(player)
if should_kick and self._kick_callback:
await self._kick_callback(game_id, input.player_id, f"Anti-cheat: {violation.value}")
return False
player.x += input.dx
player.y += input.dy
player.last_valid_position = (player.x, player.y)
return True
async def _tick_loop(self, game_id: str) -> None:
game = self._games.get(game_id)
if not game:
return
interval = self._config.interval_ms / 1000.0
last_time = time.time()
while game.running:
tick_start = time.time()
delta = tick_start - last_time
last_time = tick_start
await self._tick(game, delta)
if game.tick % self._config.broadcast_divisor == 0:
await self._broadcast_state(game)
game.tick += 1
elapsed = time.time() - tick_start
sleep_time = max(0, interval - elapsed)
if sleep_time > 0:
await asyncio.sleep(sleep_time)
async def _tick(self, game: GameState, delta: float) -> None:
current_time_ms = time.time() * 1000
for player in game.players.values():
self._validator.decay_violations(player, delta)
player.record_position(game.tick, current_time_ms)
async def _broadcast_state(self, game: GameState) -> None:
if not self._broadcast_callback:
return
state = {
"type": "game_state",
"tick": game.tick,
"players": {
pid: {"x": p.x, "y": p.y, "health": p.health}
for pid, p in game.players.items()
}
}
await self._broadcast_callback(game.game_id, state)enum ViolationType {
SPEED_HACK = 'speed_hack',
TELEPORT = 'teleport',
INVALID_ACTION = 'invalid_action',
}
interface TickConfig {
rateHz: number;
broadcastDivisor: number;
maxSpeed: number;
teleportThreshold: number;
maxRewindMs: number;
violationThreshold: number;
decayPerSecond: number;
}
const DEFAULT_CONFIG: TickConfig = {
rateHz: 60,
broadcastDivisor: 3,
maxSpeed: 300,
teleportThreshold: 100,
maxRewindMs: 200,
violationThreshold: 10,
decayPerSecond: 1.0,
};
interface PositionSnapshot {
x: number;
y: number;
tick: number;
timestampMs: number;
}
interface PlayerState {
playerId: string;
x: number;
y: number;
health: number;
lastValidPosition: [number, number];
violations: number;
positionHistory: PositionSnapshot[];
}
class TickSystem {
private config: TickConfig;
private games = new Map<string, GameState>();
private intervals = new Map<string, NodeJS.Timeout>();
constructor(config: Partial<TickConfig> = {}) {
this.config = { ...DEFAULT_CONFIG, ...config };
}
createGame(gameId: string, player1Id: string, player2Id: string): GameState {
const game: GameState = {
gameId,
tick: 0,
players: new Map([
[player1Id, this.createPlayer(player1Id, 100, 300)],
[player2Id, this.createPlayer(player2Id, 700, 300)],
]),
running: false,
};
this.games.set(gameId, game);
return game;
}
private createPlayer(id: string, x: number, y: number): PlayerState {
return {
playerId: id,
x, y,
health: 100,
lastValidPosition: [x, y],
violations: 0,
positionHistory: [],
};
}
processInput(gameId: string, input: PlayerInput): boolean {
const game = this.games.get(gameId);
if (!game?.running) return false;
const player = game.players.get(input.playerId);
if (!player) return false;
const distance = Math.sqrt(input.dx ** 2 + input.dy ** 2);
const maxDistance = this.config.maxSpeed / this.config.rateHz;
if (distance > maxDistance * 1.5) {
player.violations += 1;
return false;
}
player.x += input.dx;
player.y += input.dy;
player.lastValidPosition = [player.x, player.y];
return true;
}
getPositionAtTime(player: PlayerState, targetTimeMs: number, currentTimeMs: number): [number, number] {
const rewindMs = Math.min(currentTimeMs - targetTimeMs, this.config.maxRewindMs);
const clampedTargetTime = currentTimeMs - rewindMs;
if (player.positionHistory.length === 0) {
return [player.x, player.y];
}
let before: PositionSnapshot | null = null;
let after: PositionSnapshot | null = null;
for (const snapshot of player.positionHistory) {
if (snapshot.timestampMs <= clampedTargetTime) {
before = snapshot;
} else if (!after) {
after = snapshot;
break;
}
}
if (!before) return [player.positionHistory[0].x, player.positionHistory[0].y];
if (!after) return [before.x, before.y];
const t = Math.max(0, Math.min(1,
(clampedTargetTime - before.timestampMs) / (after.timestampMs - before.timestampMs)
));
return [
before.x + (after.x - before.x) * t,
before.y + (after.y - before.y) * t,
];
}
}tick_system = TickSystem()
# Create game with two players
game = tick_system.create_game("match-123", "player-1", "player-2")
# Set up callbacks
tick_system.set_broadcast_callback(broadcast_to_players)
tick_system.set_kick_callback(kick_player)
# Start the game loop
await tick_system.start_game("match-123")# When receiving input from client
input = PlayerInput(
player_id="player-1",
tick=game.tick,
dx=5.0,
dy=2.0,
timestamp_ms=client_timestamp
)
valid = await tick_system.process_input("match-123", input)
if not valid:
# Input was rejected (possible cheat attempt)
pass# When player fires a shot
shooter = game.players["player-1"]
target = game.players["player-2"]
hit, target_pos = tick_system._lag_comp.check_hit(
shooter=shooter,
target=target,
shot_time_ms=client_shot_timestamp,
current_time_ms=time.time() * 1000,
hit_radius=20.0
)
if hit:
target.health -= 25