file-upload-handling
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseFile Upload Handling
文件上传处理
Overview
概述
Build secure and robust file upload systems with validation, sanitization, virus scanning, efficient storage management, CDN integration, and proper file serving mechanisms across different backend frameworks.
构建具备验证、清理、病毒扫描、高效存储管理、CDN集成以及跨不同后端框架的合理文件分发机制的安全且健壮的文件上传系统。
When to Use
适用场景
- Implementing file upload features
- Managing user-uploaded documents
- Storing and serving media files
- Implementing profile picture uploads
- Building document management systems
- Handling bulk file imports
- 实现文件上传功能
- 管理用户上传的文档
- 存储和分发媒体文件
- 实现头像上传功能
- 构建文档管理系统
- 处理批量文件导入
Instructions
操作指南
1. Python/Flask File Upload
1. Python/Flask 文件上传
python
undefinedpython
undefinedconfig.py
config.py
import os
class Config:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'}
UPLOAD_DIRECTORY = os.path.join(os.path.dirname(file), UPLOAD_FOLDER)
import os
class Config:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'}
UPLOAD_DIRECTORY = os.path.join(os.path.dirname(file), UPLOAD_FOLDER)
file_service.py
file_service.py
import os
import mimetypes
import hashlib
import secrets
from werkzeug.utils import secure_filename
from datetime import datetime
import magic
import aiofiles
class FileUploadService:
def init(self, upload_dir, allowed_extensions, max_size=5010241024):
self.upload_dir = upload_dir
self.allowed_extensions = allowed_extensions
self.max_size = max_size
self.mime = magic.Magic(mime=True)
def validate_file(self, file):
"""Validate uploaded file"""
errors = []
# Check filename
if not file.filename:
errors.append('No filename provided')
# Check file extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
if ext not in self.allowed_extensions:
errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > self.max_size:
errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')
# Check MIME type
file_content = file.read(8192)
file.seek(0)
detected_mime = self.mime.from_buffer(file_content)
if not self._is_valid_mime(detected_mime, ext):
errors.append('File MIME type does not match extension')
return errors
def _is_valid_mime(self, mime_type, ext):
"""Verify MIME type matches extension"""
allowed_mimes = {
'txt': ['text/plain'],
'pdf': ['application/pdf'],
'png': ['image/png'],
'jpg': ['image/jpeg'],
'jpeg': ['image/jpeg'],
'gif': ['image/gif'],
'doc': ['application/msword'],
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
}
return mime_type in allowed_mimes.get(ext, [])
def save_file(self, file, user_id):
"""Save uploaded file with sanitization"""
errors = self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
safe_filename = f"{file_hash}.{ext}"
# Create user-specific directory
user_upload_dir = os.path.join(self.upload_dir, user_id)
os.makedirs(user_upload_dir, exist_ok=True)
filepath = os.path.join(user_upload_dir, safe_filename)
try:
file.save(filepath)
file_info = {
'id': file_hash,
'original_name': filename,
'safe_name': safe_filename,
'size': os.path.getsize(filepath),
'user_id': user_id,
'uploaded_at': datetime.utcnow().isoformat(),
'mime_type': self.mime.from_file(filepath)
}
return {'success': True, 'file': file_info}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, user_id, file_id):
"""Delete file safely"""
filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
import glob
files = glob.glob(filepath)
for f in files:
try:
os.remove(f)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
return {'success': False, 'error': 'File not found'}import os
import mimetypes
import hashlib
import secrets
from werkzeug.utils import secure_filename
from datetime import datetime
import magic
import aiofiles
class FileUploadService:
def init(self, upload_dir, allowed_extensions, max_size=5010241024):
self.upload_dir = upload_dir
self.allowed_extensions = allowed_extensions
self.max_size = max_size
self.mime = magic.Magic(mime=True)
def validate_file(self, file):
"""Validate uploaded file"""
errors = []
# Check filename
if not file.filename:
errors.append('No filename provided')
# Check file extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
if ext not in self.allowed_extensions:
errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > self.max_size:
errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')
# Check MIME type
file_content = file.read(8192)
file.seek(0)
detected_mime = self.mime.from_buffer(file_content)
if not self._is_valid_mime(detected_mime, ext):
errors.append('File MIME type does not match extension')
return errors
def _is_valid_mime(self, mime_type, ext):
"""Verify MIME type matches extension"""
allowed_mimes = {
'txt': ['text/plain'],
'pdf': ['application/pdf'],
'png': ['image/png'],
'jpg': ['image/jpeg'],
'jpeg': ['image/jpeg'],
'gif': ['image/gif'],
'doc': ['application/msword'],
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
}
return mime_type in allowed_mimes.get(ext, [])
def save_file(self, file, user_id):
"""Save uploaded file with sanitization"""
errors = self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
safe_filename = f"{file_hash}.{ext}"
# Create user-specific directory
user_upload_dir = os.path.join(self.upload_dir, user_id)
os.makedirs(user_upload_dir, exist_ok=True)
filepath = os.path.join(user_upload_dir, safe_filename)
try:
file.save(filepath)
file_info = {
'id': file_hash,
'original_name': filename,
'safe_name': safe_filename,
'size': os.path.getsize(filepath),
'user_id': user_id,
'uploaded_at': datetime.utcnow().isoformat(),
'mime_type': self.mime.from_file(filepath)
}
return {'success': True, 'file': file_info}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, user_id, file_id):
"""Delete file safely"""
filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
import glob
files = glob.glob(filepath)
for f in files:
try:
os.remove(f)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
return {'success': False, 'error': 'File not found'}routes.py
routes.py
from flask import request, jsonify, send_file, safe_join
from functools import wraps
import os
file_service = FileUploadService(
app.config['UPLOAD_DIRECTORY'],
app.config['ALLOWED_EXTENSIONS']
)
@app.route('/api/upload', methods=['POST'])
@token_required
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
result = file_service.save_file(file, current_user.id)
if result['success']:
# Save metadata to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user.id,
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.session.add(file_record)
db.session.commit()
return jsonify(result['file']), 201
else:
return jsonify({'errors': result.get('errors') or [result['error']]}), 400@app.route('/api/files/<file_id>', methods=['GET'])
@token_required
def download_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Construct safe file path
filepath = safe_join(
app.config['UPLOAD_DIRECTORY'],
current_user.id,
file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
)
if filepath and os.path.exists(filepath):
return send_file(
filepath,
mimetype=file_record.mime_type,
as_attachment=True,
download_name=file_record.original_name
)
return jsonify({'error': 'File not found'}), 404@app.route('/api/files/<file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
result = file_service.delete_file(current_user.id, file_id)
if result['success']:
db.session.delete(file_record)
db.session.commit()
return '', 204
else:
return jsonify({'error': result['error']}), 500undefinedfrom flask import request, jsonify, send_file, safe_join
from functools import wraps
import os
file_service = FileUploadService(
app.config['UPLOAD_DIRECTORY'],
app.config['ALLOWED_EXTENSIONS']
)
@app.route('/api/upload', methods=['POST'])
@token_required
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
result = file_service.save_file(file, current_user.id)
if result['success']:
# Save metadata to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user.id,
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.session.add(file_record)
db.session.commit()
return jsonify(result['file']), 201
else:
return jsonify({'errors': result.get('errors') or [result['error']]}), 400@app.route('/api/files/<file_id>', methods=['GET'])
@token_required
def download_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Construct safe file path
filepath = safe_join(
app.config['UPLOAD_DIRECTORY'],
current_user.id,
file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
)
if filepath and os.path.exists(filepath):
return send_file(
filepath,
mimetype=file_record.mime_type,
as_attachment=True,
download_name=file_record.original_name
)
return jsonify({'error': 'File not found'}), 404@app.route('/api/files/<file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
result = file_service.delete_file(current_user.id, file_id)
if result['success']:
db.session.delete(file_record)
db.session.commit()
return '', 204
else:
return jsonify({'error': result['error']}), 500undefined2. Node.js Express File Upload with Multer
2. Node.js Express 文件上传(基于Multer)
javascript
// config.js
const multer = require('multer');
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const storage = multer.diskStorage({
destination: (req, file, cb) => {
const uploadDir = path.join(__dirname, 'uploads', req.user.id);
fs.mkdirSync(uploadDir, { recursive: true });
cb(null, uploadDir);
},
filename: (req, file, cb) => {
const hash = crypto.randomBytes(16).toString('hex');
const ext = path.extname(file.originalname);
cb(null, hash + ext);
}
});
const fileFilter = (req, file, cb) => {
const allowedMimes = [
'image/jpeg',
'image/png',
'image/gif',
'application/pdf',
'text/plain'
];
const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
const ext = path.extname(file.originalname).toLowerCase();
if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
return cb(new Error('Invalid file type'));
}
cb(null, true);
};
const upload = multer({
storage: storage,
fileFilter: fileFilter,
limits: {
fileSize: 50 * 1024 * 1024 // 50 MB
}
});
module.exports = upload;
// file-service.js
const fs = require('fs').promises;
const path = require('path');
const FileRecord = require('../models/FileRecord');
class FileService {
async uploadFile(req) {
if (!req.file) {
throw new Error('No file provided');
}
const fileInfo = {
id: path.basename(req.file.filename, path.extname(req.file.filename)),
originalName: req.file.originalname,
safeName: req.file.filename,
size: req.file.size,
mimeType: req.file.mimetype,
userId: req.user.id,
uploadedAt: new Date()
};
// Save to database
const record = await FileRecord.create(fileInfo);
return record;
}
async downloadFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
return { record, filepath };
}
async deleteFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
await fs.unlink(filepath);
await record.destroy();
return { success: true };
}
async listUserFiles(userId, limit = 20, offset = 0) {
const { rows, count } = await FileRecord.findAndCountAll({
where: { userId },
limit,
offset,
order: [['uploadedAt', 'DESC']]
});
return { files: rows, total: count };
}
}
module.exports = new FileService();
// routes.js
const express = require('express');
const upload = require('../config/multer');
const fileService = require('../services/file-service');
const { authenticate } = require('../middleware/auth');
const router = express.Router();
router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => {
try {
const file = await fileService.uploadFile(req);
res.status(201).json(file);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
router.get('/files/:fileId', authenticate, async (req, res, next) => {
try {
const { record, filepath } = await fileService.downloadFile(
req.params.fileId,
req.user.id
);
res.download(filepath, record.originalName);
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.delete('/files/:fileId', authenticate, async (req, res, next) => {
try {
await fileService.deleteFile(req.params.fileId, req.user.id);
res.status(204).send();
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.get('/files', authenticate, async (req, res, next) => {
try {
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 20;
const offset = (page - 1) * limit;
const { files, total } = await fileService.listUserFiles(
req.user.id,
limit,
offset
);
res.json({
data: files,
pagination: { page, limit, total, pages: Math.ceil(total / limit) }
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;javascript
// config.js
const multer = require('multer');
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const storage = multer.diskStorage({
destination: (req, file, cb) => {
const uploadDir = path.join(__dirname, 'uploads', req.user.id);
fs.mkdirSync(uploadDir, { recursive: true });
cb(null, uploadDir);
},
filename: (req, file, cb) => {
const hash = crypto.randomBytes(16).toString('hex');
const ext = path.extname(file.originalname);
cb(null, hash + ext);
}
});
const fileFilter = (req, file, cb) => {
const allowedMimes = [
'image/jpeg',
'image/png',
'image/gif',
'application/pdf',
'text/plain'
];
const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
const ext = path.extname(file.originalname).toLowerCase();
if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
return cb(new Error('Invalid file type'));
}
cb(null, true);
};
const upload = multer({
storage: storage,
fileFilter: fileFilter,
limits: {
fileSize: 50 * 1024 * 1024 // 50 MB
}
});
module.exports = upload;
// file-service.js
const fs = require('fs').promises;
const path = require('path');
const FileRecord = require('../models/FileRecord');
class FileService {
async uploadFile(req) {
if (!req.file) {
throw new Error('No file provided');
}
const fileInfo = {
id: path.basename(req.file.filename, path.extname(req.file.filename)),
originalName: req.file.originalname,
safeName: req.file.filename,
size: req.file.size,
mimeType: req.file.mimetype,
userId: req.user.id,
uploadedAt: new Date()
};
// Save to database
const record = await FileRecord.create(fileInfo);
return record;
}
async downloadFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
return { record, filepath };
}
async deleteFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
await fs.unlink(filepath);
await record.destroy();
return { success: true };
}
async listUserFiles(userId, limit = 20, offset = 0) {
const { rows, count } = await FileRecord.findAndCountAll({
where: { userId },
limit,
offset,
order: [['uploadedAt', 'DESC']]
});
return { files: rows, total: count };
}
}
module.exports = new FileService();
// routes.js
const express = require('express');
const upload = require('../config/multer');
const fileService = require('../services/file-service');
const { authenticate } = require('../middleware/auth');
const router = express.Router();
router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => {
try {
const file = await fileService.uploadFile(req);
res.status(201).json(file);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
router.get('/files/:fileId', authenticate, async (req, res, next) => {
try {
const { record, filepath } = await fileService.downloadFile(
req.params.fileId,
req.user.id
);
res.download(filepath, record.originalName);
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.delete('/files/:fileId', authenticate, async (req, res, next) => {
try {
await fileService.deleteFile(req.params.fileId, req.user.id);
res.status(204).send();
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.get('/files', authenticate, async (req, res, next) => {
try {
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 20;
const offset = (page - 1) * limit;
const { files, total } = await fileService.listUserFiles(
req.user.id,
limit,
offset
);
res.json({
data: files,
pagination: { page, limit, total, pages: Math.ceil(total / limit) }
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;3. FastAPI File Upload
3. FastAPI 文件上传
python
undefinedpython
undefinedmain.py
main.py
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
import hashlib
import secrets
from pathlib import Path
app = FastAPI()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}
class FileUploadService:
async def validate_file(self, file: UploadFile) -> list:
"""Validate uploaded file"""
errors = []
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
errors.append(f'File type not allowed')
# Check file size
content = await file.read()
await file.seek(0)
if len(content) > MAX_FILE_SIZE:
errors.append(f'File too large')
return errors
async def save_file(self, file: UploadFile, user_id: str):
"""Save uploaded file"""
errors = await self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
file_ext = Path(file.filename).suffix
safe_filename = f"{file_hash}{file_ext}"
# Create user directory
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
filepath = user_dir / safe_filename
try:
content = await file.read()
async with aiofiles.open(filepath, 'wb') as f:
await f.write(content)
return {
'success': True,
'file': {
'id': file_hash,
'original_name': file.filename,
'safe_name': safe_filename,
'size': len(content),
'mime_type': file.content_type
}
}
except Exception as e:
return {'success': False, 'error': str(e)}file_service = FileUploadService()
@app.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
current_user: dict = Depends(get_current_user)
):
"""Upload file"""
result = await file_service.save_file(file, current_user['id'])
if result['success']:
# Save to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user['id'],
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.add(file_record)
await db.commit()
return result['file']
else:
raise HTTPException(status_code=400, detail=result.get('errors'))@app.get("/api/files/{file_id}")
async def download_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Download file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
return FileResponse(
path=filepath,
media_type=file_record.mime_type,
filename=file_record.original_name
)@app.delete("/api/files/{file_id}")
async def delete_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Delete file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
try:
Path(filepath).unlink()
await db.delete(file_record)
await db.commit()
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))undefinedfrom fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
import hashlib
import secrets
from pathlib import Path
app = FastAPI()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}
class FileUploadService:
async def validate_file(self, file: UploadFile) -> list:
"""Validate uploaded file"""
errors = []
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
errors.append(f'File type not allowed')
# Check file size
content = await file.read()
await file.seek(0)
if len(content) > MAX_FILE_SIZE:
errors.append(f'File too large')
return errors
async def save_file(self, file: UploadFile, user_id: str):
"""Save uploaded file"""
errors = await self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
file_ext = Path(file.filename).suffix
safe_filename = f"{file_hash}{file_ext}"
# Create user directory
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
filepath = user_dir / safe_filename
try:
content = await file.read()
async with aiofiles.open(filepath, 'wb') as f:
await f.write(content)
return {
'success': True,
'file': {
'id': file_hash,
'original_name': file.filename,
'safe_name': safe_filename,
'size': len(content),
'mime_type': file.content_type
}
}
except Exception as e:
return {'success': False, 'error': str(e)}file_service = FileUploadService()
@app.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
current_user: dict = Depends(get_current_user)
):
"""Upload file"""
result = await file_service.save_file(file, current_user['id'])
if result['success']:
# Save to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user['id'],
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.add(file_record)
await db.commit()
return result['file']
else:
raise HTTPException(status_code=400, detail=result.get('errors'))@app.get("/api/files/{file_id}")
async def download_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Download file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
return FileResponse(
path=filepath,
media_type=file_record.mime_type,
filename=file_record.original_name
)@app.delete("/api/files/{file_id}")
async def delete_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Delete file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
try:
Path(filepath).unlink()
await db.delete(file_record)
await db.commit()
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))undefined4. S3/Cloud Storage Integration
4. S3/云存储集成
python
undefinedpython
undefineds3_service.py
s3_service.py
import boto3
from datetime import timedelta
import os
class S3FileService:
def init(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION')
)
self.bucket_name = os.getenv('S3_BUCKET_NAME')
def upload_file(self, file, user_id, file_key):
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(
file,
self.bucket_name,
file_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': {'user_id': user_id}
}
)
return {'success': True, 'key': file_key}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_signed_url(self, file_key, expires_in=3600):
"""Generate signed URL for download"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': file_key},
ExpiresIn=expires_in
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, file_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}undefinedimport boto3
from datetime import timedelta
import os
class S3FileService:
def init(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION')
)
self.bucket_name = os.getenv('S3_BUCKET_NAME')
def upload_file(self, file, user_id, file_key):
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(
file,
self.bucket_name,
file_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': {'user_id': user_id}
}
)
return {'success': True, 'key': file_key}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_signed_url(self, file_key, expires_in=3600):
"""Generate signed URL for download"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': file_key},
ExpiresIn=expires_in
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, file_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}undefinedBest Practices
最佳实践
✅ DO
✅ 建议
- Validate file extensions and MIME types
- Check file size before processing
- Use secure filenames to prevent directory traversal
- Store files outside web root
- Implement virus scanning
- Use CDN for file delivery
- Generate signed URLs for direct access
- Log file upload/download events
- Implement access control checks
- Clean up temporary files
- 验证文件扩展名和MIME类型
- 处理前检查文件大小
- 使用安全文件名防止目录遍历攻击
- 将文件存储在Web根目录之外
- 实现病毒扫描
- 使用CDN分发文件
- 生成签名URL用于直接访问
- 记录文件上传/下载事件
- 实现访问控制检查
- 清理临时文件
❌ DON'T
❌ 禁止
- Trust user-provided filenames
- Store files in web-accessible directories
- Allow arbitrary file types
- Skip virus scanning for uploaded files
- Expose absolute file paths
- Allow unlimited file sizes
- Ignore access control
- Use predictable file paths
- Store sensitive metadata in filenames
- Forget to validate file content
- 信任用户提供的文件名
- 将文件存储在可通过Web访问的目录中
- 允许任意文件类型
- 跳过上传文件的病毒扫描
- 暴露绝对文件路径
- 允许无限制的文件大小
- 忽略访问控制
- 使用可预测的文件路径
- 在文件名中存储敏感元数据
- 忽略文件内容验证
Complete Example
完整示例
python
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
if file.size > 10 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
allowed = ['.pdf', '.txt', '.jpg']
ext = Path(file.filename).suffix
if ext not in allowed:
raise HTTPException(status_code=400, detail="Invalid file type")
filename = f"{uuid4()}{ext}"
async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
await f.write(await file.read())
return {"filename": filename}python
@app.post("/upload")
async def upload(file: UploadFile = File(...)):
if file.size > 10 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
allowed = ['.pdf', '.txt', '.jpg']
ext = Path(file.filename).suffix
if ext not in allowed:
raise HTTPException(status_code=400, detail="Invalid file type")
filename = f"{uuid4()}{ext}"
async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
await f.write(await file.read())
return {"filename": filename}