file-uploads
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSecure Upload Pipeline
安全文件上传管道
Production-grade file upload handling with validation, malware scanning, and duplicate detection.
具备验证、恶意软件扫描和重复文件检测功能的生产级文件上传处理方案。
When to Use This Skill
适用场景
- Building file upload endpoints that handle untrusted input
- Need malware scanning before processing files
- Want to prevent duplicate file processing
- Handling concurrent uploads of the same file
- 构建处理不可信输入的文件上传端点
- 需要在处理文件前进行恶意软件扫描
- 希望避免重复处理文件
- 处理同一文件的并发上传请求
Core Concepts
核心概念
File uploads are attack vectors. The solution is a multi-stage validation pipeline that fails fast and checks cheap things first:
Upload Request
↓
[1] Size + Type Check (instant)
↓
[2] Content Signature Validation (ms)
↓
[3] Malware Scan - ClamAV (50-200ms)
↓
[4] Hash-Based Duplicate Check (ms)
↓
[5] Race Condition Lock (Redis)
↓
[6] Upload to Storage
↓
[7] Clear Lock + Return URLKey principle: Check limits BEFORE upload to prevent wasted processing.
文件上传是常见的攻击入口。本方案采用多阶段验证管道,遵循"快速失败"原则,优先检查成本较低的项:
Upload Request
↓
[1] Size + Type Check (instant)
↓
[2] Content Signature Validation (ms)
↓
[3] Malware Scan - ClamAV (50-200ms)
↓
[4] Hash-Based Duplicate Check (ms)
↓
[5] Race Condition Lock (Redis)
↓
[6] Upload to Storage
↓
[7] Clear Lock + Return URL核心原则:在文件上传完成前先检查限制条件,避免浪费处理资源。
Implementation
实现方案
Python (FastAPI)
Python (FastAPI)
python
import hashlib
from typing import Optional, Dict
from fastapi import UploadFile, HTTPException
class FileValidator:
def __init__(self):
self.max_size = 10 * 1024 * 1024 # 10MB
self.allowed_types = ['application/pdf', 'image/jpeg', 'image/png']
self.malware_scanner = MalwareScannerService()
async def validate_file(self, file: UploadFile) -> Dict:
"""Multi-stage file validation"""
validation_details = {
"filename": file.filename,
"content_type": file.content_type,
"checks_passed": []
}
# Check 1: File size (before reading content)
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
if file_size > self.max_size:
return {
"valid": False,
"error": f"File too large ({file_size / 1024 / 1024:.1f}MB). Max 10MB.",
"validation_details": validation_details
}
if file_size == 0:
return {"valid": False, "error": "File is empty."}
validation_details["checks_passed"].append("size_check")
# Check 2: MIME type
if file.content_type not in self.allowed_types:
return {
"valid": False,
"error": f"Invalid file type ({file.content_type})."
}
validation_details["checks_passed"].append("type_check")
# Check 3: Content signature (PDF magic bytes)
if file.content_type == 'application/pdf':
header = await file.read(4)
file.file.seek(0)
if header != b'%PDF':
return {"valid": False, "error": "File appears corrupted."}
validation_details["checks_passed"].append("pdf_signature_check")
# Check 4: Malware scan
scan_result = await self.malware_scanner.scan_file(file)
if not scan_result['safe']:
return {
"valid": False,
"error": f"Security threat detected: {scan_result.get('threat_found')}"
}
validation_details["checks_passed"].append("malware_scan")
return {"valid": True, "error": None, "validation_details": validation_details}
class MalwareScannerService:
"""ClamAV integration with graceful degradation"""
def __init__(self):
self.enabled = os.getenv('CLAMAV_ENABLED', 'true').lower() == 'true'
self.client = None
if self.enabled:
try:
import clamd
self.client = clamd.ClamdNetworkSocket(
host=os.getenv('CLAMAV_HOST', 'localhost'),
port=int(os.getenv('CLAMAV_PORT', '3310'))
)
self.client.ping()
except Exception as e:
logger.warning(f"ClamAV not available: {e}")
self.enabled = False
async def scan_file(self, file: UploadFile) -> Dict:
if not self.enabled or not self.client:
return {"safe": True, "scan_performed": False}
try:
from io import BytesIO
file_content = await file.read()
file.file.seek(0)
result = self.client.instream(BytesIO(file_content))
status, threat = result.get('stream', ('ERROR', 'Unknown'))
if status == 'OK':
return {"safe": True, "scan_performed": True}
elif status == 'FOUND':
logger.warning(f"MALWARE: {file.filename} - {threat}")
return {"safe": False, "threat_found": threat, "scan_performed": True}
else:
return {"safe": False, "threat_found": f"Scan error: {status}"}
except Exception as e:
# Fail-safe: reject if scan fails
return {"safe": False, "threat_found": f"Scan failed: {str(e)}"}
class DuplicateDetector:
"""Hash-based duplicate detection with race protection"""
def calculate_file_hash(self, file_content: bytes) -> str:
return hashlib.sha256(file_content).hexdigest()
async def check_duplicate(self, account_id: str, file_hash: str) -> Optional[Dict]:
result = self.client.table("files").select("id").eq(
"account_id", account_id
).eq("file_hash", file_hash).execute()
if result.data:
return {"type": "file_hash", "message": "Exact duplicate detected"}
return None
async def mark_processing(self, account_id: str, file_hash: str, ttl: int = 300):
"""Mark file as being processed (prevents concurrent processing)"""
key = f"processing:{account_id}:{file_hash}"
self.redis.setex(key, ttl, "1")
async def is_processing(self, account_id: str, file_hash: str) -> bool:
key = f"processing:{account_id}:{file_hash}"
return self.redis.exists(key) > 0
async def clear_processing(self, account_id: str, file_hash: str):
key = f"processing:{account_id}:{file_hash}"
self.redis.delete(key)python
import hashlib
from typing import Optional, Dict
from fastapi import UploadFile, HTTPException
class FileValidator:
def __init__(self):
self.max_size = 10 * 1024 * 1024 # 10MB
self.allowed_types = ['application/pdf', 'image/jpeg', 'image/png']
self.malware_scanner = MalwareScannerService()
async def validate_file(self, file: UploadFile) -> Dict:
"""Multi-stage file validation"""
validation_details = {
"filename": file.filename,
"content_type": file.content_type,
"checks_passed": []
}
# Check 1: File size (before reading content)
file.file.seek(0, 2)
file_size = file.file.tell()
file.file.seek(0)
if file_size > self.max_size:
return {
"valid": False,
"error": f"File too large ({file_size / 1024 / 1024:.1f}MB). Max 10MB.",
"validation_details": validation_details
}
if file_size == 0:
return {"valid": False, "error": "File is empty."}
validation_details["checks_passed"].append("size_check")
# Check 2: MIME type
if file.content_type not in self.allowed_types:
return {
"valid": False,
"error": f"Invalid file type ({file.content_type})."
}
validation_details["checks_passed"].append("type_check")
# Check 3: Content signature (PDF magic bytes)
if file.content_type == 'application/pdf':
header = await file.read(4)
file.file.seek(0)
if header != b'%PDF':
return {"valid": False, "error": "File appears corrupted."}
validation_details["checks_passed"].append("pdf_signature_check")
# Check 4: Malware scan
scan_result = await self.malware_scanner.scan_file(file)
if not scan_result['safe']:
return {
"valid": False,
"error": f"Security threat detected: {scan_result.get('threat_found')}"
}
validation_details["checks_passed"].append("malware_scan")
return {"valid": True, "error": None, "validation_details": validation_details}
class MalwareScannerService:
"""ClamAV integration with graceful degradation"""
def __init__(self):
self.enabled = os.getenv('CLAMAV_ENABLED', 'true').lower() == 'true'
self.client = None
if self.enabled:
try:
import clamd
self.client = clamd.ClamdNetworkSocket(
host=os.getenv('CLAMAV_HOST', 'localhost'),
port=int(os.getenv('CLAMAV_PORT', '3310'))
)
self.client.ping()
except Exception as e:
logger.warning(f"ClamAV not available: {e}")
self.enabled = False
async def scan_file(self, file: UploadFile) -> Dict:
if not self.enabled or not self.client:
return {"safe": True, "scan_performed": False}
try:
from io import BytesIO
file_content = await file.read()
file.file.seek(0)
result = self.client.instream(BytesIO(file_content))
status, threat = result.get('stream', ('ERROR', 'Unknown'))
if status == 'OK':
return {"safe": True, "scan_performed": True}
elif status == 'FOUND':
logger.warning(f"MALWARE: {file.filename} - {threat}")
return {"safe": False, "threat_found": threat, "scan_performed": True}
else:
return {"safe": False, "threat_found": f"Scan error: {status}"}
except Exception as e:
# Fail-safe: reject if scan fails
return {"safe": False, "threat_found": f"Scan failed: {str(e)}"}
class DuplicateDetector:
"""Hash-based duplicate detection with race protection"""
def calculate_file_hash(self, file_content: bytes) -> str:
return hashlib.sha256(file_content).hexdigest()
async def check_duplicate(self, account_id: str, file_hash: str) -> Optional[Dict]:
result = self.client.table("files").select("id").eq(
"account_id", account_id
).eq("file_hash", file_hash).execute()
if result.data:
return {"type": "file_hash", "message": "Exact duplicate detected"}
return None
async def mark_processing(self, account_id: str, file_hash: str, ttl: int = 300):
"""Mark file as being processed (prevents concurrent processing)"""
key = f"processing:{account_id}:{file_hash}"
self.redis.setex(key, ttl, "1")
async def is_processing(self, account_id: str, file_hash: str) -> bool:
key = f"processing:{account_id}:{file_hash}"
return self.redis.exists(key) > 0
async def clear_processing(self, account_id: str, file_hash: str):
key = f"processing:{account_id}:{file_hash}"
self.redis.delete(key)TypeScript
TypeScript
typescript
import { createHash } from 'crypto';
interface ValidationResult {
valid: boolean;
error?: string;
checksPassed: string[];
}
class FileValidator {
private maxSize = 10 * 1024 * 1024; // 10MB
private allowedTypes = ['application/pdf', 'image/jpeg', 'image/png'];
async validate(file: File): Promise<ValidationResult> {
const checksPassed: string[] = [];
// Check 1: Size
if (file.size > this.maxSize) {
return { valid: false, error: 'File too large', checksPassed };
}
if (file.size === 0) {
return { valid: false, error: 'File is empty', checksPassed };
}
checksPassed.push('size_check');
// Check 2: MIME type
if (!this.allowedTypes.includes(file.type)) {
return { valid: false, error: 'Invalid file type', checksPassed };
}
checksPassed.push('type_check');
// Check 3: Content signature
if (file.type === 'application/pdf') {
const header = await this.readHeader(file, 4);
if (header !== '%PDF') {
return { valid: false, error: 'File appears corrupted', checksPassed };
}
checksPassed.push('pdf_signature_check');
}
return { valid: true, checksPassed };
}
private async readHeader(file: File, bytes: number): Promise<string> {
const slice = file.slice(0, bytes);
const buffer = await slice.arrayBuffer();
return new TextDecoder().decode(buffer);
}
}
class DuplicateDetector {
async calculateHash(file: File): Promise<string> {
const buffer = await file.arrayBuffer();
const hashBuffer = await crypto.subtle.digest('SHA-256', buffer);
return Array.from(new Uint8Array(hashBuffer))
.map(b => b.toString(16).padStart(2, '0'))
.join('');
}
}typescript
import { createHash } from 'crypto';
interface ValidationResult {
valid: boolean;
error?: string;
checksPassed: string[];
}
class FileValidator {
private maxSize = 10 * 1024 * 1024; // 10MB
private allowedTypes = ['application/pdf', 'image/jpeg', 'image/png'];
async validate(file: File): Promise<ValidationResult> {
const checksPassed: string[] = [];
// Check 1: Size
if (file.size > this.maxSize) {
return { valid: false, error: 'File too large', checksPassed };
}
if (file.size === 0) {
return { valid: false, error: 'File is empty', checksPassed };
}
checksPassed.push('size_check');
// Check 2: MIME type
if (!this.allowedTypes.includes(file.type)) {
return { valid: false, error: 'Invalid file type', checksPassed };
}
checksPassed.push('type_check');
// Check 3: Content signature
if (file.type === 'application/pdf') {
const header = await this.readHeader(file, 4);
if (header !== '%PDF') {
return { valid: false, error: 'File appears corrupted', checksPassed };
}
checksPassed.push('pdf_signature_check');
}
return { valid: true, checksPassed };
}
private async readHeader(file: File, bytes: number): Promise<string> {
const slice = file.slice(0, bytes);
const buffer = await slice.arrayBuffer();
return new TextDecoder().decode(buffer);
}
}
class DuplicateDetector {
async calculateHash(file: File): Promise<string> {
const buffer = await file.arrayBuffer();
const hashBuffer = await crypto.subtle.digest('SHA-256', buffer);
return Array.from(new Uint8Array(hashBuffer))
.map(b => b.toString(16).padStart(2, '0'))
.join('');
}
}Usage Examples
使用示例
Complete Upload Endpoint
完整上传端点
python
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
auth: AuthenticatedUser = Depends(get_current_user),
):
# Check limits FIRST
allowed, details = usage_service.check_limit(auth.id, 'file_upload')
if not allowed:
raise HTTPException(status_code=429, detail=details)
# Stage 1-3: Validate
validation = await file_validator.validate_file(file)
if not validation['valid']:
raise HTTPException(status_code=400, detail=validation['error'])
# Stage 4: Hash for duplicate detection
file.file.seek(0)
file_content = await file.read()
file_hash = duplicate_detector.calculate_file_hash(file_content)
file.file.seek(0)
# Check duplicate
duplicate = await duplicate_detector.check_duplicate(auth.account_id, file_hash)
if duplicate:
raise HTTPException(status_code=409, detail=duplicate)
# Stage 5: Race protection
if await duplicate_detector.is_processing(auth.account_id, file_hash):
raise HTTPException(status_code=409, detail="File is being processed")
await duplicate_detector.mark_processing(auth.account_id, file_hash, ttl=300)
try:
# Stage 6: Upload
file_url = await storage_service.upload_file(file, auth.id)
finally:
# Stage 7: Clear lock
await duplicate_detector.clear_processing(auth.account_id, file_hash)
return {"success": True, "file_url": file_url, "file_hash": file_hash}python
@router.post("/upload")
async def upload_file(
file: UploadFile = File(...),
auth: AuthenticatedUser = Depends(get_current_user),
):
# Check limits FIRST
allowed, details = usage_service.check_limit(auth.id, 'file_upload')
if not allowed:
raise HTTPException(status_code=429, detail=details)
# Stage 1-3: Validate
validation = await file_validator.validate_file(file)
if not validation['valid']:
raise HTTPException(status_code=400, detail=validation['error'])
# Stage 4: Hash for duplicate detection
file.file.seek(0)
file_content = await file.read()
file_hash = duplicate_detector.calculate_file_hash(file_content)
file.file.seek(0)
# Check duplicate
duplicate = await duplicate_detector.check_duplicate(auth.account_id, file_hash)
if duplicate:
raise HTTPException(status_code=409, detail=duplicate)
# Stage 5: Race protection
if await duplicate_detector.is_processing(auth.account_id, file_hash):
raise HTTPException(status_code=409, detail="File is being processed")
await duplicate_detector.mark_processing(auth.account_id, file_hash, ttl=300)
try:
# Stage 6: Upload
file_url = await storage_service.upload_file(file, auth.id)
finally:
# Stage 7: Clear lock
await duplicate_detector.clear_processing(auth.account_id, file_hash)
return {"success": True, "file_url": file_url, "file_hash": file_hash}Best Practices
最佳实践
- Check limits BEFORE upload - Don't waste bandwidth on files that will be rejected
- TTL on processing markers - If upload crashes, marker auto-expires (300s default)
- ClamAV graceful degradation - Don't block uploads if scanner is down
- Hash before upload - Calculate hash from memory, not after storage write
- Fail-safe on scan errors - Reject file if malware scan fails
- 优先检查限制条件 - 不要为最终会被拒绝的文件浪费带宽
- 为处理标记设置TTL - 如果上传失败,标记会自动过期(默认300秒)
- ClamAV优雅降级 - 扫描器不可用时不要阻止上传
- 上传前计算哈希 - 从内存中计算哈希,而非写入存储后再计算
- 扫描错误时的安全策略 - 恶意软件扫描失败时拒绝文件
Common Mistakes
常见误区
- Processing files before checking usage limits
- No TTL on processing markers (stuck forever if crash)
- Blocking uploads when ClamAV is unavailable
- Calculating hash after storage write (wasted upload)
- Allowing uploads when malware scan fails
- 在检查使用限制前就处理文件
- 不给处理标记设置TTL(如果崩溃,标记会永久存在)
- ClamAV不可用时阻止上传
- 写入存储后再计算哈希(浪费上传资源)
- 恶意软件扫描失败时仍允许上传
Related Patterns
相关模式
- rate-limiting - Rate limit upload endpoints
- distributed-lock - Coordinate concurrent uploads
- validation-quarantine - Quarantine suspicious files
- rate-limiting - 对上传端点进行限流
- distributed-lock - 协调并发上传请求
- validation-quarantine - 隔离可疑文件