Loading...
Loading...
Compare original and translation side by side
import cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as npimport cProfile
import pstats
import timeit
import memory_profiler
from functools import lru_cache
from typing import List
import numpy as np result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # Top 10 functions
return result
return wrapper result = func(*args, **kwargs)
profiler.disable()
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(10) # 输出前10个函数
return result
return wrapperreturn {
'total_time': total_time,
'avg_time': avg_time,
'iterations': iterations
}return {
'total_time': total_time,
'avg_time': avg_time,
'iterations': iterations
}undefinedundefinedfrom sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redisfrom sqlalchemy import create_engine, Index, text
from sqlalchemy.orm import sessionmaker
import redisdef bad_n_plus_one(self):
"""N+1 query problem"""
users = self.session.query(User).all()
for user in users:
posts = user.posts # Triggers additional query per user
print(f"{user.name}: {len(posts)} posts")
def good_eager_loading(self):
"""Eager loading to avoid N+1"""
from sqlalchemy.orm import joinedload
users = self.session.query(User)\
.options(joinedload(User.posts))\
.all()
for user in users:
posts = user.posts # No additional query
print(f"{user.name}: {len(posts)} posts")
def use_indexes(self):
"""Create indexes for frequent queries"""
# Index on frequently queried columns
Index('idx_user_email', User.email)
Index('idx_post_created_at', Post.created_at)
# Composite index
Index('idx_post_user_status', Post.user_id, Post.status)
def batch_operations(self, items: List[dict]):
"""Batch insert instead of individual inserts"""
# Bad: Individual inserts
# for item in items:
# self.session.add(User(**item))
# self.session.commit()
# Good: Batch insert
self.session.bulk_insert_mappings(User, items)
self.session.commit()
def use_pagination(self, page: int = 1, page_size: int = 20):
"""Paginate large result sets"""
offset = (page - 1) * page_size
return self.session.query(User)\
.order_by(User.created_at.desc())\
.limit(page_size)\
.offset(offset)\
.all()def bad_n_plus_one(self):
"""N+1查询问题"""
users = self.session.query(User).all()
for user in users:
posts = user.posts # 每个用户都会触发额外查询
print(f"{user.name}: {len(posts)} posts")
def good_eager_loading(self):
"""预加载避免N+1问题"""
from sqlalchemy.orm import joinedload
users = self.session.query(User)\
.options(joinedload(User.posts))\
.all()
for user in users:
posts = user.posts # 无额外查询
print(f"{user.name}: {len(posts)} posts")
def use_indexes(self):
"""为频繁查询创建索引"""
# 为频繁查询的列创建索引
Index('idx_user_email', User.email)
Index('idx_post_created_at', Post.created_at)
# 复合索引
Index('idx_post_user_status', Post.user_id, Post.status)
def batch_operations(self, items: List[dict]):
"""批量插入而非单个插入"""
# 低效:单个插入
# for item in items:
# self.session.add(User(**item))
# self.session.commit()
# 高效:批量插入
self.session.bulk_insert_mappings(User, items)
self.session.commit()
def use_pagination(self, page: int = 1, page_size: int = 20):
"""对大型结果集进行分页"""
offset = (page - 1) * page_size
return self.session.query(User)\
.order_by(User.created_at.desc())\
.limit(page_size)\
.offset(offset)\
.all()def cache_query_result(self, key: str, query_func, ttl: int = 3600):
"""Cache database query results"""
# Check cache first
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# Execute query
result = query_func()
# Cache result
self.redis.setex(key, ttl, json.dumps(result))
return result
def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
"""Implement cache-aside pattern"""
data = self.redis.get(key)
if data is None:
data = fetch_func()
self.redis.setex(key, ttl, json.dumps(data))
else:
data = json.loads(data)
return dataundefineddef cache_query_result(self, key: str, query_func, ttl: int = 3600):
"""缓存数据库查询结果"""
# 先检查缓存
cached = self.redis.get(key)
if cached:
return json.loads(cached)
# 执行查询
result = query_func()
# 缓存结果
self.redis.setex(key, ttl, json.dumps(result))
return result
def cache_aside_pattern(self, key: str, fetch_func, ttl: int = 3600):
"""实现旁路缓存模式"""
data = self.redis.get(key)
if data is None:
data = fetch_func()
self.redis.setex(key, ttl, json.dumps(data))
else:
data = json.loads(data)
return dataundefined// Debouncing for expensive operations
function debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
// Example: Debounce search input
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
// Expensive search operation
fetchSearchResults(query);
}, 300);
searchInput.addEventListener('input', (e) => {
debouncedSearch(e.target.value);
});
// Lazy loading images
const lazyLoadImages = () => {
const images = document.querySelectorAll('img[data-src]');
const imageObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const img = entry.target;
img.src = img.dataset.src;
img.removeAttribute('data-src');
imageObserver.unobserve(img);
}
});
});
images.forEach(img => imageObserver.observe(img));
};
// Virtual scrolling for large lists
class VirtualScroller {
constructor(container, items, itemHeight) {
this.container = container;
this.items = items;
this.itemHeight = itemHeight;
this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
this.render();
}
render() {
const scrollTop = this.container.scrollTop;
const startIndex = Math.floor(scrollTop / this.itemHeight);
const endIndex = startIndex + this.visibleItems;
// Only render visible items
const visibleData = this.items.slice(startIndex, endIndex);
this.container.innerHTML = visibleData
.map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
.join('');
}
}
// Code splitting with dynamic imports
async function loadModule() {
const module = await import('./heavy-module.js');
module.init();
}// 针对高开销操作的防抖处理
function debounce(func, wait) {
let timeout;
return function executedFunction(...args) {
const later = () => {
clearTimeout(timeout);
func(...args);
};
clearTimeout(timeout);
timeout = setTimeout(later, wait);
};
}
// 示例:搜索输入防抖
const searchInput = document.getElementById('search');
const debouncedSearch = debounce((query) => {
// 高开销搜索操作
fetchSearchResults(query);
}, 300);
searchInput.addEventListener('input', (e) => {
debouncedSearch(e.target.value);
});
// 图片懒加载
const lazyLoadImages = () => {
const images = document.querySelectorAll('img[data-src]');
const imageObserver = new IntersectionObserver((entries) => {
entries.forEach(entry => {
if (entry.isIntersecting) {
const img = entry.target;
img.src = img.dataset.src;
img.removeAttribute('data-src');
imageObserver.unobserve(img);
}
});
});
images.forEach(img => imageObserver.observe(img));
};
// 大型列表虚拟滚动
class VirtualScroller {
constructor(container, items, itemHeight) {
this.container = container;
this.items = items;
this.itemHeight = itemHeight;
this.visibleItems = Math.ceil(container.clientHeight / itemHeight);
this.render();
}
render() {
const scrollTop = this.container.scrollTop;
const startIndex = Math.floor(scrollTop / this.itemHeight);
const endIndex = startIndex + this.visibleItems;
// 仅渲染可见项
const visibleData = this.items.slice(startIndex, endIndex);
this.container.innerHTML = visibleData
.map(item => `<div style="height: ${this.itemHeight}px">${item}</div>`)
.join('');
}
}
// 动态导入实现代码分割
async function loadModule() {
const module = await import('./heavy-module.js');
module.init();
}from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()from fastapi import FastAPI, BackgroundTasks
from fastapi.responses import StreamingResponse
import asyncio
from typing import AsyncGenerator
app = FastAPI()async def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str):
async with self.session.get(url) as response:
return await response.json()undefinedasync def __aenter__(self):
self.session = aiohttp.ClientSession(
connector=aiohttp.TCPConnector(limit=100)
)
return self
async def __aexit__(self, *args):
await self.session.close()
async def fetch(self, url: str):
async with self.session.get(url) as response:
return await response.json()undefinedfrom locust import HttpUser, task, between
class PerformanceTest(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_users(self):
"""High frequency endpoint"""
self.client.get("/api/users")
@task(1)
def create_user(self):
"""Lower frequency endpoint"""
self.client.post("/api/users", json={
"email": "test@example.com",
"name": "Test User"
})
def on_start(self):
"""Login once per user"""
response = self.client.post("/api/login", json={
"username": "test",
"password": "password"
})
self.token = response.json()["token"]from locust import HttpUser, task, between
class PerformanceTest(HttpUser):
wait_time = between(1, 3)
@task(3)
def get_users(self):
"""高频端点"""
self.client.get("/api/users")
@task(1)
def create_user(self):
"""低频端点"""
self.client.post("/api/users", json={
"email": "test@example.com",
"name": "Test User"
})
def on_start(self):
"""每个用户登录一次"""
response = self.client.post("/api/login", json={
"username": "test",
"password": "password"
})
self.token = response.json()["token"]