Loading...
Loading...
Implement secure file upload handling with validation, virus scanning, storage management, and serving files efficiently. Use when building file upload features, managing file storage, and implementing file download systems.
npx skill4agent add aj-geddes/useful-ai-prompts file-upload-handling# config.py
import os
class Config:
MAX_CONTENT_LENGTH = 50 * 1024 * 1024 # 50 MB
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'txt', 'pdf', 'png', 'jpg', 'jpeg', 'gif', 'docx', 'doc'}
UPLOAD_DIRECTORY = os.path.join(os.path.dirname(__file__), UPLOAD_FOLDER)
# file_service.py
import os
import mimetypes
import hashlib
import secrets
from werkzeug.utils import secure_filename
from datetime import datetime
import magic
import aiofiles
class FileUploadService:
def __init__(self, upload_dir, allowed_extensions, max_size=50*1024*1024):
self.upload_dir = upload_dir
self.allowed_extensions = allowed_extensions
self.max_size = max_size
self.mime = magic.Magic(mime=True)
def validate_file(self, file):
"""Validate uploaded file"""
errors = []
# Check filename
if not file.filename:
errors.append('No filename provided')
# Check file extension
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
if ext not in self.allowed_extensions:
errors.append(f'File type not allowed. Allowed: {", ".join(self.allowed_extensions)}')
# Check file size
file.seek(0, os.SEEK_END)
file_size = file.tell()
file.seek(0)
if file_size > self.max_size:
errors.append(f'File too large. Max size: {self.max_size / 1024 / 1024}MB')
# Check MIME type
file_content = file.read(8192)
file.seek(0)
detected_mime = self.mime.from_buffer(file_content)
if not self._is_valid_mime(detected_mime, ext):
errors.append('File MIME type does not match extension')
return errors
def _is_valid_mime(self, mime_type, ext):
"""Verify MIME type matches extension"""
allowed_mimes = {
'txt': ['text/plain'],
'pdf': ['application/pdf'],
'png': ['image/png'],
'jpg': ['image/jpeg'],
'jpeg': ['image/jpeg'],
'gif': ['image/gif'],
'doc': ['application/msword'],
'docx': ['application/vnd.openxmlformats-officedocument.wordprocessingml.document']
}
return mime_type in allowed_mimes.get(ext, [])
def save_file(self, file, user_id):
"""Save uploaded file with sanitization"""
errors = self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
filename = secure_filename(file.filename)
ext = filename.rsplit('.', 1)[1].lower() if '.' in filename else ''
safe_filename = f"{file_hash}.{ext}"
# Create user-specific directory
user_upload_dir = os.path.join(self.upload_dir, user_id)
os.makedirs(user_upload_dir, exist_ok=True)
filepath = os.path.join(user_upload_dir, safe_filename)
try:
file.save(filepath)
file_info = {
'id': file_hash,
'original_name': filename,
'safe_name': safe_filename,
'size': os.path.getsize(filepath),
'user_id': user_id,
'uploaded_at': datetime.utcnow().isoformat(),
'mime_type': self.mime.from_file(filepath)
}
return {'success': True, 'file': file_info}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, user_id, file_id):
"""Delete file safely"""
filepath = os.path.join(self.upload_dir, user_id, f"{file_id}.*")
import glob
files = glob.glob(filepath)
for f in files:
try:
os.remove(f)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}
return {'success': False, 'error': 'File not found'}
# routes.py
from flask import request, jsonify, send_file, safe_join
from functools import wraps
import os
file_service = FileUploadService(
app.config['UPLOAD_DIRECTORY'],
app.config['ALLOWED_EXTENSIONS']
)
@app.route('/api/upload', methods=['POST'])
@token_required
def upload_file():
if 'file' not in request.files:
return jsonify({'error': 'No file provided'}), 400
file = request.files['file']
result = file_service.save_file(file, current_user.id)
if result['success']:
# Save metadata to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user.id,
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.session.add(file_record)
db.session.commit()
return jsonify(result['file']), 201
else:
return jsonify({'errors': result.get('errors') or [result['error']]}), 400
@app.route('/api/files/<file_id>', methods=['GET'])
@token_required
def download_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
# Construct safe file path
filepath = safe_join(
app.config['UPLOAD_DIRECTORY'],
current_user.id,
file_record.file_id + '.' + file_record.original_name.rsplit('.', 1)[1]
)
if filepath and os.path.exists(filepath):
return send_file(
filepath,
mimetype=file_record.mime_type,
as_attachment=True,
download_name=file_record.original_name
)
return jsonify({'error': 'File not found'}), 404
@app.route('/api/files/<file_id>', methods=['DELETE'])
@token_required
def delete_file(file_id):
file_record = FileRecord.query.filter_by(
file_id=file_id,
user_id=current_user.id
).first()
if not file_record:
return jsonify({'error': 'File not found'}), 404
result = file_service.delete_file(current_user.id, file_id)
if result['success']:
db.session.delete(file_record)
db.session.commit()
return '', 204
else:
return jsonify({'error': result['error']}), 500// config.js
const multer = require('multer');
const path = require('path');
const crypto = require('crypto');
const fs = require('fs');
const storage = multer.diskStorage({
destination: (req, file, cb) => {
const uploadDir = path.join(__dirname, 'uploads', req.user.id);
fs.mkdirSync(uploadDir, { recursive: true });
cb(null, uploadDir);
},
filename: (req, file, cb) => {
const hash = crypto.randomBytes(16).toString('hex');
const ext = path.extname(file.originalname);
cb(null, hash + ext);
}
});
const fileFilter = (req, file, cb) => {
const allowedMimes = [
'image/jpeg',
'image/png',
'image/gif',
'application/pdf',
'text/plain'
];
const allowedExts = ['.pdf', '.txt', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'];
const ext = path.extname(file.originalname).toLowerCase();
if (!allowedMimes.includes(file.mimetype) || !allowedExts.includes(ext)) {
return cb(new Error('Invalid file type'));
}
cb(null, true);
};
const upload = multer({
storage: storage,
fileFilter: fileFilter,
limits: {
fileSize: 50 * 1024 * 1024 // 50 MB
}
});
module.exports = upload;
// file-service.js
const fs = require('fs').promises;
const path = require('path');
const FileRecord = require('../models/FileRecord');
class FileService {
async uploadFile(req) {
if (!req.file) {
throw new Error('No file provided');
}
const fileInfo = {
id: path.basename(req.file.filename, path.extname(req.file.filename)),
originalName: req.file.originalname,
safeName: req.file.filename,
size: req.file.size,
mimeType: req.file.mimetype,
userId: req.user.id,
uploadedAt: new Date()
};
// Save to database
const record = await FileRecord.create(fileInfo);
return record;
}
async downloadFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
return { record, filepath };
}
async deleteFile(fileId, userId) {
const record = await FileRecord.findOne({
where: { id: fileId, userId }
});
if (!record) {
throw new Error('File not found');
}
const filepath = path.join(__dirname, 'uploads', userId, record.safeName);
await fs.unlink(filepath);
await record.destroy();
return { success: true };
}
async listUserFiles(userId, limit = 20, offset = 0) {
const { rows, count } = await FileRecord.findAndCountAll({
where: { userId },
limit,
offset,
order: [['uploadedAt', 'DESC']]
});
return { files: rows, total: count };
}
}
module.exports = new FileService();
// routes.js
const express = require('express');
const upload = require('../config/multer');
const fileService = require('../services/file-service');
const { authenticate } = require('../middleware/auth');
const router = express.Router();
router.post('/upload', authenticate, upload.single('file'), async (req, res, next) => {
try {
const file = await fileService.uploadFile(req);
res.status(201).json(file);
} catch (error) {
res.status(400).json({ error: error.message });
}
});
router.get('/files/:fileId', authenticate, async (req, res, next) => {
try {
const { record, filepath } = await fileService.downloadFile(
req.params.fileId,
req.user.id
);
res.download(filepath, record.originalName);
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.delete('/files/:fileId', authenticate, async (req, res, next) => {
try {
await fileService.deleteFile(req.params.fileId, req.user.id);
res.status(204).send();
} catch (error) {
res.status(404).json({ error: error.message });
}
});
router.get('/files', authenticate, async (req, res, next) => {
try {
const page = parseInt(req.query.page) || 1;
const limit = parseInt(req.query.limit) || 20;
const offset = (page - 1) * limit;
const { files, total } = await fileService.listUserFiles(
req.user.id,
limit,
offset
);
res.json({
data: files,
pagination: { page, limit, total, pages: Math.ceil(total / limit) }
});
} catch (error) {
res.status(500).json({ error: error.message });
}
});
module.exports = router;# main.py
from fastapi import FastAPI, UploadFile, File, Depends, HTTPException
from fastapi.responses import FileResponse
import aiofiles
import os
import hashlib
import secrets
from pathlib import Path
app = FastAPI()
UPLOAD_DIR = Path("uploads")
UPLOAD_DIR.mkdir(exist_ok=True)
MAX_FILE_SIZE = 50 * 1024 * 1024
ALLOWED_EXTENSIONS = {'.txt', '.pdf', '.png', '.jpg', '.jpeg', '.gif', '.docx', '.doc'}
class FileUploadService:
async def validate_file(self, file: UploadFile) -> list:
"""Validate uploaded file"""
errors = []
# Check extension
file_ext = Path(file.filename).suffix.lower()
if file_ext not in ALLOWED_EXTENSIONS:
errors.append(f'File type not allowed')
# Check file size
content = await file.read()
await file.seek(0)
if len(content) > MAX_FILE_SIZE:
errors.append(f'File too large')
return errors
async def save_file(self, file: UploadFile, user_id: str):
"""Save uploaded file"""
errors = await self.validate_file(file)
if errors:
return {'success': False, 'errors': errors}
# Generate secure filename
file_hash = hashlib.sha256(secrets.token_bytes(32)).hexdigest()
file_ext = Path(file.filename).suffix
safe_filename = f"{file_hash}{file_ext}"
# Create user directory
user_dir = UPLOAD_DIR / user_id
user_dir.mkdir(exist_ok=True)
filepath = user_dir / safe_filename
try:
content = await file.read()
async with aiofiles.open(filepath, 'wb') as f:
await f.write(content)
return {
'success': True,
'file': {
'id': file_hash,
'original_name': file.filename,
'safe_name': safe_filename,
'size': len(content),
'mime_type': file.content_type
}
}
except Exception as e:
return {'success': False, 'error': str(e)}
file_service = FileUploadService()
@app.post("/api/upload")
async def upload_file(
file: UploadFile = File(...),
current_user: dict = Depends(get_current_user)
):
"""Upload file"""
result = await file_service.save_file(file, current_user['id'])
if result['success']:
# Save to database
file_record = FileRecord(
file_id=result['file']['id'],
original_name=result['file']['original_name'],
user_id=current_user['id'],
size=result['file']['size'],
mime_type=result['file']['mime_type']
)
db.add(file_record)
await db.commit()
return result['file']
else:
raise HTTPException(status_code=400, detail=result.get('errors'))
@app.get("/api/files/{file_id}")
async def download_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Download file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
return FileResponse(
path=filepath,
media_type=file_record.mime_type,
filename=file_record.original_name
)
@app.delete("/api/files/{file_id}")
async def delete_file(
file_id: str,
current_user: dict = Depends(get_current_user)
):
"""Delete file"""
file_record = await db.query(FileRecord).filter(
FileRecord.file_id == file_id,
FileRecord.user_id == current_user['id']
).first()
if not file_record:
raise HTTPException(status_code=404, detail="File not found")
filepath = UPLOAD_DIR / current_user['id'] / file_record.safe_name
try:
Path(filepath).unlink()
await db.delete(file_record)
await db.commit()
return {'success': True}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))# s3_service.py
import boto3
from datetime import timedelta
import os
class S3FileService:
def __init__(self):
self.s3_client = boto3.client(
's3',
aws_access_key_id=os.getenv('AWS_ACCESS_KEY_ID'),
aws_secret_access_key=os.getenv('AWS_SECRET_ACCESS_KEY'),
region_name=os.getenv('AWS_REGION')
)
self.bucket_name = os.getenv('S3_BUCKET_NAME')
def upload_file(self, file, user_id, file_key):
"""Upload file to S3"""
try:
self.s3_client.upload_fileobj(
file,
self.bucket_name,
file_key,
ExtraArgs={
'ContentType': file.content_type,
'Metadata': {'user_id': user_id}
}
)
return {'success': True, 'key': file_key}
except Exception as e:
return {'success': False, 'error': str(e)}
def get_signed_url(self, file_key, expires_in=3600):
"""Generate signed URL for download"""
try:
url = self.s3_client.generate_presigned_url(
'get_object',
Params={'Bucket': self.bucket_name, 'Key': file_key},
ExpiresIn=expires_in
)
return {'success': True, 'url': url}
except Exception as e:
return {'success': False, 'error': str(e)}
def delete_file(self, file_key):
"""Delete file from S3"""
try:
self.s3_client.delete_object(Bucket=self.bucket_name, Key=file_key)
return {'success': True}
except Exception as e:
return {'success': False, 'error': str(e)}@app.post("/upload")
async def upload(file: UploadFile = File(...)):
if file.size > 10 * 1024 * 1024:
raise HTTPException(status_code=413, detail="File too large")
allowed = ['.pdf', '.txt', '.jpg']
ext = Path(file.filename).suffix
if ext not in allowed:
raise HTTPException(status_code=400, detail="Invalid file type")
filename = f"{uuid4()}{ext}"
async with aiofiles.open(f"uploads/{filename}", 'wb') as f:
await f.write(await file.read())
return {"filename": filename}