session-management
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseSession Management
会话管理
Overview
概述
Implement comprehensive session management systems with secure token handling, session persistence, token refresh mechanisms, proper logout procedures, and CSRF protection across different backend frameworks.
在不同后端框架中实现包含安全令牌处理、会话持久化、令牌刷新机制、规范登出流程以及CSRF保护的全面会话管理系统。
When to Use
适用场景
- Implementing user authentication systems
- Managing session state and user context
- Handling JWT token refresh cycles
- Implementing logout functionality
- Protecting against CSRF attacks
- Managing session expiration and cleanup
- 实现用户认证系统
- 管理会话状态与用户上下文
- 处理JWT令牌刷新周期
- 实现登出功能
- 防范CSRF攻击
- 管理会话过期与清理
Instructions
操作步骤
1. JWT Token Generation and Validation
1. JWT令牌生成与验证
python
undefinedpython
undefinedPython/Flask Example
Python/Flask Example
from flask import current_app
from datetime import datetime, timedelta
import jwt
import os
class TokenManager:
def init(self, secret_key=None):
self.secret_key = secret_key or os.getenv('JWT_SECRET')
self.algorithm = 'HS256'
self.access_token_expires_hours = 1
self.refresh_token_expires_days = 7
def generate_tokens(self, user_id, email, role='user'):
"""Generate both access and refresh tokens"""
now = datetime.utcnow()
# Access token
access_payload = {
'user_id': user_id,
'email': email,
'role': role,
'type': 'access',
'iat': now,
'exp': now + timedelta(hours=self.access_token_expires_hours)
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
# Refresh token
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=self.refresh_token_expires_days)
}
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_in': self.access_token_expires_hours * 3600,
'token_type': 'Bearer'
}
def verify_token(self, token, token_type='access'):
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Check token type matches
if payload.get('type') != token_type:
return None, 'Invalid token type'
return payload, None
except jwt.ExpiredSignatureError:
return None, 'Token expired'
except jwt.InvalidTokenError:
return None, 'Invalid token'
def refresh_access_token(self, refresh_token):
"""Generate new access token from refresh token"""
payload, error = self.verify_token(refresh_token, token_type='refresh')
if error:
return None, error
new_access_token = self.generate_tokens(
payload['user_id'],
payload.get('email', ''),
payload.get('role', 'user')
)
return new_access_token, Noneundefinedfrom flask import current_app
from datetime import datetime, timedelta
import jwt
import os
class TokenManager:
def init(self, secret_key=None):
self.secret_key = secret_key or os.getenv('JWT_SECRET')
self.algorithm = 'HS256'
self.access_token_expires_hours = 1
self.refresh_token_expires_days = 7
def generate_tokens(self, user_id, email, role='user'):
"""Generate both access and refresh tokens"""
now = datetime.utcnow()
# Access token
access_payload = {
'user_id': user_id,
'email': email,
'role': role,
'type': 'access',
'iat': now,
'exp': now + timedelta(hours=self.access_token_expires_hours)
}
access_token = jwt.encode(access_payload, self.secret_key, algorithm=self.algorithm)
# Refresh token
refresh_payload = {
'user_id': user_id,
'type': 'refresh',
'iat': now,
'exp': now + timedelta(days=self.refresh_token_expires_days)
}
refresh_token = jwt.encode(refresh_payload, self.secret_key, algorithm=self.algorithm)
return {
'access_token': access_token,
'refresh_token': refresh_token,
'expires_in': self.access_token_expires_hours * 3600,
'token_type': 'Bearer'
}
def verify_token(self, token, token_type='access'):
"""Verify and decode JWT token"""
try:
payload = jwt.decode(token, self.secret_key, algorithms=[self.algorithm])
# Check token type matches
if payload.get('type') != token_type:
return None, 'Invalid token type'
return payload, None
except jwt.ExpiredSignatureError:
return None, 'Token expired'
except jwt.InvalidTokenError:
return None, 'Invalid token'
def refresh_access_token(self, refresh_token):
"""Generate new access token from refresh token"""
payload, error = self.verify_token(refresh_token, token_type='refresh')
if error:
return None, error
new_access_token = self.generate_tokens(
payload['user_id'],
payload.get('email', ''),
payload.get('role', 'user')
)
return new_access_token, Noneundefined2. Node.js/Express JWT Implementation
2. Node.js/Express JWT实现
javascript
// Node.js/Express Example
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');
class SessionManager {
constructor() {
this.secretKey = process.env.JWT_SECRET || 'dev-secret';
this.algorithm = 'HS256';
this.accessTokenExpiry = '1h';
this.refreshTokenExpiry = '7d';
this.redisClient = redis.createClient();
}
generateTokens(userId, email, role = 'user') {
const now = new Date();
const jti = crypto.randomBytes(16).toString('hex');
const accessToken = jwt.sign(
{
userId,
email,
role,
type: 'access',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.accessTokenExpiry }
);
const refreshToken = jwt.sign(
{
userId,
type: 'refresh',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.refreshTokenExpiry }
);
return {
accessToken,
refreshToken,
expiresIn: 3600,
tokenType: 'Bearer'
};
}
verifyToken(token, tokenType = 'access') {
try {
const decoded = jwt.verify(token, this.secretKey, {
algorithms: [this.algorithm]
});
if (decoded.type !== tokenType) {
return { payload: null, error: 'Invalid token type' };
}
return { payload: decoded, error: null };
} catch (err) {
if (err.name === 'TokenExpiredError') {
return { payload: null, error: 'Token expired' };
}
return { payload: null, error: 'Invalid token' };
}
}
async isTokenBlacklisted(jti) {
const result = await this.redisClient.get(`blacklist:${jti}`);
return result !== null;
}
async blacklistToken(jti, expiresIn) {
await this.redisClient.setex(`blacklist:${jti}`, expiresIn, '1');
}
async logout(token) {
const decoded = jwt.decode(token);
if (decoded && decoded.jti) {
const expiresIn = decoded.exp - Math.floor(Date.now() / 1000);
await this.blacklistToken(decoded.jti, expiresIn);
}
}
refreshAccessToken(refreshToken) {
const { payload, error } = this.verifyToken(refreshToken, 'refresh');
if (error) {
return { tokens: null, error };
}
return {
tokens: this.generateTokens(payload.userId, payload.email, payload.role),
error: null
};
}
}
// Middleware
const authMiddleware = (req, res, next) => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
const sessionManager = new SessionManager();
const { payload, error } = sessionManager.verifyToken(token);
if (error) {
return res.status(401).json({ error });
}
req.user = payload;
next();
};javascript
// Node.js/Express Example
const jwt = require('jsonwebtoken');
const crypto = require('crypto');
const redis = require('redis');
class SessionManager {
constructor() {
this.secretKey = process.env.JWT_SECRET || 'dev-secret';
this.algorithm = 'HS256';
this.accessTokenExpiry = '1h';
this.refreshTokenExpiry = '7d';
this.redisClient = redis.createClient();
}
generateTokens(userId, email, role = 'user') {
const now = new Date();
const jti = crypto.randomBytes(16).toString('hex');
const accessToken = jwt.sign(
{
userId,
email,
role,
type: 'access',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.accessTokenExpiry }
);
const refreshToken = jwt.sign(
{
userId,
type: 'refresh',
jti,
iat: Math.floor(now.getTime() / 1000)
},
this.secretKey,
{ algorithm: this.algorithm, expiresIn: this.refreshTokenExpiry }
);
return {
accessToken,
refreshToken,
expiresIn: 3600,
tokenType: 'Bearer'
};
}
verifyToken(token, tokenType = 'access') {
try {
const decoded = jwt.verify(token, this.secretKey, {
algorithms: [this.algorithm]
});
if (decoded.type !== tokenType) {
return { payload: null, error: 'Invalid token type' };
}
return { payload: decoded, error: null };
} catch (err) {
if (err.name === 'TokenExpiredError') {
return { payload: null, error: 'Token expired' };
}
return { payload: null, error: 'Invalid token' };
}
}
async isTokenBlacklisted(jti) {
const result = await this.redisClient.get(`blacklist:${jti}`);
return result !== null;
}
async blacklistToken(jti, expiresIn) {
await this.redisClient.setex(`blacklist:${jti}`, expiresIn, '1');
}
async logout(token) {
const decoded = jwt.decode(token);
if (decoded && decoded.jti) {
const expiresIn = decoded.exp - Math.floor(Date.now() / 1000);
await this.blacklistToken(decoded.jti, expiresIn);
}
}
refreshAccessToken(refreshToken) {
const { payload, error } = this.verifyToken(refreshToken, 'refresh');
if (error) {
return { tokens: null, error };
}
return {
tokens: this.generateTokens(payload.userId, payload.email, payload.role),
error: null
};
}
}
// Middleware
const authMiddleware = (req, res, next) => {
const authHeader = req.headers['authorization'];
const token = authHeader && authHeader.split(' ')[1];
if (!token) {
return res.status(401).json({ error: 'No token provided' });
}
const sessionManager = new SessionManager();
const { payload, error } = sessionManager.verifyToken(token);
if (error) {
return res.status(401).json({ error });
}
req.user = payload;
next();
};3. Session Storage with Redis
3. 基于Redis的会话存储
python
undefinedpython
undefinedPython/Flask with Redis
Python/Flask with Redis
import redis
import json
from datetime import timedelta
from functools import wraps
class RedisSessionManager:
def init(self, redis_url='redis://localhost:6379'):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.prefix = 'session:'
def create_session(self, user_id, data, expire_hours=24):
"""Create a session for user"""
session_data = {
'user_id': user_id,
'data': data,
'created_at': datetime.utcnow().isoformat(),
'last_activity': datetime.utcnow().isoformat()
}
session_id = secrets.token_urlsafe(32)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=expire_hours),
json.dumps(session_data)
)
return session_id
def get_session(self, session_id):
"""Retrieve session data"""
key = f'{self.prefix}{session_id}'
data = self.redis.get(key)
if not data:
return None
session_data = json.loads(data)
# Update last activity
session_data['last_activity'] = datetime.utcnow().isoformat()
self.redis.setex(key, timedelta(hours=24), json.dumps(session_data))
return session_data
def destroy_session(self, session_id):
"""Destroy a session"""
key = f'{self.prefix}{session_id}'
self.redis.delete(key)
def update_session(self, session_id, updates):
"""Update session data"""
session_data = self.get_session(session_id)
if not session_data:
return False
session_data['data'].update(updates)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=24),
json.dumps(session_data)
)
return True
def get_user_sessions(self, user_id):
"""Get all sessions for a user"""
cursor = 0
sessions = []
while True:
cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
for key in keys:
data = json.loads(self.redis.get(key))
if data['user_id'] == user_id:
sessions.append({
'session_id': key.replace(self.prefix, ''),
'created_at': data['created_at'],
'last_activity': data['last_activity']
})
if cursor == 0:
break
return sessions
def invalidate_all_user_sessions(self, user_id):
"""Logout user from all devices"""
sessions = self.get_user_sessions(user_id)
for session in sessions:
self.destroy_session(session['session_id'])undefinedimport redis
import json
from datetime import timedelta
from functools import wraps
class RedisSessionManager:
def init(self, redis_url='redis://localhost:6379'):
self.redis = redis.from_url(redis_url, decode_responses=True)
self.prefix = 'session:'
def create_session(self, user_id, data, expire_hours=24):
"""Create a session for user"""
session_data = {
'user_id': user_id,
'data': data,
'created_at': datetime.utcnow().isoformat(),
'last_activity': datetime.utcnow().isoformat()
}
session_id = secrets.token_urlsafe(32)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=expire_hours),
json.dumps(session_data)
)
return session_id
def get_session(self, session_id):
"""Retrieve session data"""
key = f'{self.prefix}{session_id}'
data = self.redis.get(key)
if not data:
return None
session_data = json.loads(data)
# Update last activity
session_data['last_activity'] = datetime.utcnow().isoformat()
self.redis.setex(key, timedelta(hours=24), json.dumps(session_data))
return session_data
def destroy_session(self, session_id):
"""Destroy a session"""
key = f'{self.prefix}{session_id}'
self.redis.delete(key)
def update_session(self, session_id, updates):
"""Update session data"""
session_data = self.get_session(session_id)
if not session_data:
return False
session_data['data'].update(updates)
key = f'{self.prefix}{session_id}'
self.redis.setex(
key,
timedelta(hours=24),
json.dumps(session_data)
)
return True
def get_user_sessions(self, user_id):
"""Get all sessions for a user"""
cursor = 0
sessions = []
while True:
cursor, keys = self.redis.scan(cursor, match=f'{self.prefix}*')
for key in keys:
data = json.loads(self.redis.get(key))
if data['user_id'] == user_id:
sessions.append({
'session_id': key.replace(self.prefix, ''),
'created_at': data['created_at'],
'last_activity': data['last_activity']
})
if cursor == 0:
break
return sessions
def invalidate_all_user_sessions(self, user_id):
"""Logout user from all devices"""
sessions = self.get_user_sessions(user_id)
for session in sessions:
self.destroy_session(session['session_id'])undefined4. CSRF Protection
4. CSRF保护
python
undefinedpython
undefinedFlask CSRF Protection
Flask CSRF Protection
from flask_wtf.csrf import CSRFProtect
from flask import session, request
csrf = CSRFProtect()
@app.route('/login', methods=['POST'])
@csrf.protect
def login():
# CSRF token is automatically verified
email = request.json.get('email')
password = request.json.get('password')
user = User.query.filter_by(email=email).first()
if user and user.verify_password(password):
session['user_id'] = user.id
session['csrf_token'] = csrf.generate_csrf()
return jsonify({'success': True}), 200
return jsonify({'error': 'Invalid credentials'}), 401from flask_wtf.csrf import CSRFProtect
from flask import session, request
csrf = CSRFProtect()
@app.route('/login', methods=['POST'])
@csrf.protect
def login():
# CSRF token is automatically verified
email = request.json.get('email')
password = request.json.get('password')
user = User.query.filter_by(email=email).first()
if user and user.verify_password(password):
session['user_id'] = user.id
session['csrf_token'] = csrf.generate_csrf()
return jsonify({'success': True}), 200
return jsonify({'error': 'Invalid credentials'}), 401JavaScript client
JavaScript client
async function login(email, password) {
const response = await fetch('/csrf-token');
const { csrfToken } = await response.json();
return fetch('/login', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify({ email, password })
});}
undefinedasync function login(email, password) {
const response = await fetch('/csrf-token');
const { csrfToken } = await response.json();
return fetch('/login', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRF-Token': csrfToken
},
body: JSON.stringify({ email, password })
});}
undefined5. Session Middleware Chain
5. 会话中间件链
javascript
// Node.js middleware chain
const express = require('express');
const app = express();
// 1. Parse cookies
app.use(express.json());
app.use(cookieParser(process.env.COOKIE_SECRET));
// 2. Session middleware
app.use(session({
secret: process.env.SESSION_SECRET,
resave: false,
saveUninitialized: false,
cookie: {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 24 * 60 * 60 * 1000
},
store: new RedisStore({ client: redisClient })
}));
// 3. CSRF protection
const csrfProtection = csrf({ cookie: false });
// 4. Rate limiting per session
const sessionRateLimit = rateLimit({
store: new RedisStore({ client: redisClient }),
keyGenerator: (req) => req.sessionID,
windowMs: 15 * 60 * 1000,
max: 100
});
app.use(sessionRateLimit);
// 5. Authentication check
const requireAuth = (req, res, next) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Unauthorized' });
}
req.user = req.session.user;
next();
};
app.post('/api/login', csrfProtection, async (req, res) => {
// Verify credentials
const user = await User.findOne({ email: req.body.email });
if (user && await user.verifyPassword(req.body.password)) {
req.session.user = { id: user.id, email: user.email, role: user.role };
req.session.regenerate((err) => {
if (err) return res.status(500).json({ error: 'Server error' });
res.json({ success: true });
});
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
app.post('/api/logout', requireAuth, (req, res) => {
req.session.destroy((err) => {
if (err) return res.status(500).json({ error: 'Logout failed' });
res.clearCookie('connect.sid');
res.json({ success: true });
});
});javascript
// Node.js middleware chain
const express = require('express');
const app = express();
// 1. Parse cookies
app.use(express.json());
app.use(cookieParser(process.env.COOKIE_SECRET));
// 2. Session middleware
app.use(session({
secret: process.env.SESSION_SECRET,
resave: false,
saveUninitialized: false,
cookie: {
httpOnly: true,
secure: process.env.NODE_ENV === 'production',
sameSite: 'strict',
maxAge: 24 * 60 * 60 * 1000
},
store: new RedisStore({ client: redisClient })
}));
// 3. CSRF protection
const csrfProtection = csrf({ cookie: false });
// 4. Rate limiting per session
const sessionRateLimit = rateLimit({
store: new RedisStore({ client: redisClient }),
keyGenerator: (req) => req.sessionID,
windowMs: 15 * 60 * 1000,
max: 100
});
app.use(sessionRateLimit);
// 5. Authentication check
const requireAuth = (req, res, next) => {
if (!req.session.user) {
return res.status(401).json({ error: 'Unauthorized' });
}
req.user = req.session.user;
next();
};
app.post('/api/login', csrfProtection, async (req, res) => {
// Verify credentials
const user = await User.findOne({ email: req.body.email });
if (user && await user.verifyPassword(req.body.password)) {
req.session.user = { id: user.id, email: user.email, role: user.role };
req.session.regenerate((err) => {
if (err) return res.status(500).json({ error: 'Server error' });
res.json({ success: true });
});
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
});
app.post('/api/logout', requireAuth, (req, res) => {
req.session.destroy((err) => {
if (err) return res.status(500).json({ error: 'Logout failed' });
res.clearCookie('connect.sid');
res.json({ success: true });
});
});6. Token Refresh Endpoint
6. 令牌刷新端点
python
undefinedpython
undefinedFlask token refresh endpoint
Flask token refresh endpoint
from flask import request, jsonify
from functools import wraps
@app.route('/api/auth/refresh', methods=['POST'])
def refresh_token():
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Refresh token required'}), 400
token_manager = TokenManager()
tokens, error = token_manager.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200@app.route('/api/auth/logout', methods=['POST'])
@require_auth
def logout():
token = request.headers['Authorization'].split(' ')[1]
session_manager = RedisSessionManager()
session_manager.destroy_session(token)
return jsonify({'message': 'Logged out successfully'}), 200undefinedfrom flask import request, jsonify
from functools import wraps
@app.route('/api/auth/refresh', methods=['POST'])
def refresh_token():
data = request.get_json()
refresh_token = data.get('refresh_token')
if not refresh_token:
return jsonify({'error': 'Refresh token required'}), 400
token_manager = TokenManager()
tokens, error = token_manager.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200@app.route('/api/auth/logout', methods=['POST'])
@require_auth
def logout():
token = request.headers['Authorization'].split(' ')[1]
session_manager = RedisSessionManager()
session_manager.destroy_session(token)
return jsonify({'message': 'Logged out successfully'}), 200undefined7. Session Cleanup and Maintenance
7. 会话清理与维护
python
undefinedpython
undefinedScheduled cleanup task with APScheduler
Scheduled cleanup task with APScheduler
from apscheduler.schedulers.background import BackgroundScheduler
import atexit
class SessionCleanup:
def init(self, redis_client, cleanup_interval_minutes=60):
self.redis = redis_client
self.cleanup_interval = cleanup_interval_minutes
self.scheduler = BackgroundScheduler()
def start(self):
self.scheduler.add_job(
func=self.cleanup_expired_sessions,
trigger='interval',
minutes=self.cleanup_interval,
id='cleanup_expired_sessions',
replace_existing=True
)
self.scheduler.start()
atexit.register(lambda: self.scheduler.shutdown())
def cleanup_expired_sessions(self):
"""Remove expired sessions from Redis"""
cursor = 0
removed_count = 0
while True:
cursor, keys = self.redis.scan(cursor, match='session:*')
for key in keys:
ttl = self.redis.ttl(key)
if ttl == -2: # Key doesn't exist
removed_count += 1
elif ttl < 300: # Less than 5 minutes left
self.redis.delete(key)
removed_count += 1
if cursor == 0:
break
return removed_countfrom apscheduler.schedulers.background import BackgroundScheduler
import atexit
class SessionCleanup:
def init(self, redis_client, cleanup_interval_minutes=60):
self.redis = redis_client
self.cleanup_interval = cleanup_interval_minutes
self.scheduler = BackgroundScheduler()
def start(self):
self.scheduler.add_job(
func=self.cleanup_expired_sessions,
trigger='interval',
minutes=self.cleanup_interval,
id='cleanup_expired_sessions',
replace_existing=True
)
self.scheduler.start()
atexit.register(lambda: self.scheduler.shutdown())
def cleanup_expired_sessions(self):
"""Remove expired sessions from Redis"""
cursor = 0
removed_count = 0
while True:
cursor, keys = self.redis.scan(cursor, match='session:*')
for key in keys:
ttl = self.redis.ttl(key)
if ttl == -2: # Key doesn't exist
removed_count += 1
elif ttl < 300: # Less than 5 minutes left
self.redis.delete(key)
removed_count += 1
if cursor == 0:
break
return removed_countInitialize on app startup
Initialize on app startup
cleanup = SessionCleanup(redis_client)
cleanup.start()
undefinedcleanup = SessionCleanup(redis_client)
cleanup.start()
undefinedBest Practices
最佳实践
✅ DO
✅ 建议做法
- Use HTTPS for all session transmission
- Implement secure cookies (httpOnly, sameSite, secure flags)
- Use JWT with proper expiration times
- Implement token refresh mechanism
- Store refresh tokens securely
- Validate tokens on every request
- Use strong secret keys
- Implement session timeout
- Log authentication events
- Clear session data on logout
- Use CSRF tokens for state-changing requests
- 所有会话传输使用HTTPS
- 实现安全Cookie(设置httpOnly、sameSite、secure标记)
- 使用带有合理过期时间的JWT
- 实现令牌刷新机制
- 安全存储刷新令牌
- 每次请求都验证令牌
- 使用强密钥
- 实现会话超时
- 记录认证事件
- 登出时清除会话数据
- 对状态变更请求使用CSRF令牌
❌ DON'T
❌ 禁止做法
- Store sensitive data in tokens
- Use short secret keys
- Transmit tokens in URLs
- Ignore token expiration
- Reuse token secrets across environments
- Store tokens in localStorage (use httpOnly cookies)
- Implement session without HTTPS
- Forget to validate token signatures
- Expose session IDs in logs
- Use predictable session IDs
- 在令牌中存储敏感数据
- 使用短密钥
- 在URL中传输令牌
- 忽略令牌过期
- 在不同环境中复用令牌密钥
- 在localStorage中存储令牌(应使用httpOnly Cookie)
- 不使用HTTPS实现会话
- 忘记验证令牌签名
- 在日志中暴露会话ID
- 使用可预测的会话ID
Complete Example
完整示例
python
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import jwt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
TOKEN_MANAGER = TokenManager()
@app.route('/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(email=data['email']).first()
if user and user.verify_password(data['password']):
tokens = TOKEN_MANAGER.generate_tokens(user.id, user.email, user.role)
return jsonify(tokens), 200
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/refresh', methods=['POST'])
def refresh():
refresh_token = request.json.get('refresh_token')
tokens, error = TOKEN_MANAGER.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200python
from flask import Flask, request, jsonify
from datetime import datetime, timedelta
import jwt
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
TOKEN_MANAGER = TokenManager()
@app.route('/login', methods=['POST'])
def login():
data = request.json
user = User.query.filter_by(email=data['email']).first()
if user and user.verify_password(data['password']):
tokens = TOKEN_MANAGER.generate_tokens(user.id, user.email, user.role)
return jsonify(tokens), 200
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/refresh', methods=['POST'])
def refresh():
refresh_token = request.json.get('refresh_token')
tokens, error = TOKEN_MANAGER.refresh_access_token(refresh_token)
if error:
return jsonify({'error': error}), 401
return jsonify(tokens), 200