elixir-otp
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseElixir OTP Skill
Elixir OTP 技能
Essential OTP Patterns (KISS First)
核心OTP模式(优先遵循KISS原则)
Use these patterns for 80% of use cases. Start simple, optimize later.
这些模式适用于80%的使用场景。从简单方案开始,后续再优化。
GenServer: Stateful Processes
GenServer:有状态进程
When to use GenServer:
- Need to maintain state across function calls
- State needs to be shared across processes
- Need to serialize access to a resource
- Long-running processes with periodic tasks
Basic GenServer Pattern:
elixir
defmodule MyApp.Counter do
use GenServer
# Client API (public interface)
def start_link(initial_value \\ 0) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def get_count do
GenServer.call(__MODULE__, :get_count)
end
def increment do
GenServer.cast(__MODULE__, :increment)
end
def increment_by(amount) do
GenServer.call(__MODULE__, {:increment_by, amount})
end
# Server Callbacks (private implementation)
@impl true
def init(initial_value) do
{:ok, initial_value}
end
@impl true
def handle_call(:get_count, _from, count) do
{:reply, count, count}
end
@impl true
def handle_call({:increment_by, amount}, _from, count) do
new_count = count + amount
{:reply, new_count, new_count}
end
@impl true
def handle_cast(:increment, count) do
{:noreply, count + 1}
end
endCommon GenServer Patterns:
1. Resource Pool:
elixir
defmodule MyApp.ConnectionPool do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def checkout do
GenServer.call(__MODULE__, :checkout)
end
def checkin(connection) do
GenServer.cast(__MODULE__, {:checkin, connection})
end
@impl true
def init(opts) do
pool_size = Keyword.get(opts, :pool_size, 10)
connections = Enum.map(1..pool_size, fn _ -> create_connection() end)
{:ok, %{available: connections, in_use: []}}
end
@impl true
def handle_call(:checkout, from, %{available: [conn | rest]} = state) do
monitor_ref = Process.monitor(from)
new_state = %{
state |
available: rest,
in_use: [{conn, from, monitor_ref} | state.in_use]
}
{:reply, {:ok, conn}, new_state}
end
def handle_call(:checkout, _from, %{available: []} = state) do
{:reply, {:error, :no_connections}, state}
end
end2. Cache with TTL:
elixir
defmodule MyApp.Cache do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def put(key, value, ttl \\ 60_000) do
GenServer.cast(__MODULE__, {:put, key, value, ttl})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
@impl true
def init(_) do
# Schedule cleanup every minute
:timer.send_interval(60_000, :cleanup)
{:ok, %{}}
end
@impl true
def handle_cast({:put, key, value, ttl}, cache) do
expires_at = System.system_time(:millisecond) + ttl
{:noreply, Map.put(cache, key, {value, expires_at})}
end
@impl true
def handle_call({:get, key}, _from, cache) do
now = System.system_time(:millisecond)
case Map.get(cache, key) do
{value, expires_at} when expires_at > now -> {:reply, {:ok, value}, cache}
_ -> {:reply, :not_found, cache}
end
end
@impl true
def handle_info(:cleanup, cache) do
now = System.system_time(:millisecond)
clean_cache = cache |> Enum.reject(fn {_key, {_value, expires_at}} -> expires_at <= now end) |> Map.new()
{:noreply, clean_cache}
end
end何时使用GenServer:
- 需要在函数调用之间维护状态
- 状态需要在多个进程之间共享
- 需要对资源进行序列化访问
- 带有周期性任务的长期运行进程
基础GenServer模式:
elixir
defmodule MyApp.Counter do
use GenServer
# Client API (public interface)
def start_link(initial_value \\ 0) do
GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
end
def get_count do
GenServer.call(__MODULE__, :get_count)
end
def increment do
GenServer.cast(__MODULE__, :increment)
end
def increment_by(amount) do
GenServer.call(__MODULE__, {:increment_by, amount})
end
# Server Callbacks (private implementation)
@impl true
def init(initial_value) do
{:ok, initial_value}
end
@impl true
def handle_call(:get_count, _from, count) do
{:reply, count, count}
end
@impl true
def handle_call({:increment_by, amount}, _from, count) do
new_count = count + amount
{:reply, new_count, new_count}
end
@impl true
def handle_cast(:increment, count) do
{:noreply, count + 1}
end
end常见GenServer模式:
1. 资源池:
elixir
defmodule MyApp.ConnectionPool do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def checkout do
GenServer.call(__MODULE__, :checkout)
end
def checkin(connection) do
GenServer.cast(__MODULE__, {:checkin, connection})
end
@impl true
def init(opts) do
pool_size = Keyword.get(opts, :pool_size, 10)
connections = Enum.map(1..pool_size, fn _ -> create_connection() end)
{:ok, %{available: connections, in_use: []}}
end
@impl true
def handle_call(:checkout, from, %{available: [conn | rest]} = state) do
monitor_ref = Process.monitor(from)
new_state = %{
state |
available: rest,
in_use: [{conn, from, monitor_ref} | state.in_use]
}
{:reply, {:ok, conn}, new_state}
end
def handle_call(:checkout, _from, %{available: []} = state) do
{:reply, {:error, :no_connections}, state}
end
end2. 带TTL的缓存:
elixir
defmodule MyApp.Cache do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def put(key, value, ttl \\ 60_000) do
GenServer.cast(__MODULE__, {:put, key, value, ttl})
end
def get(key) do
GenServer.call(__MODULE__, {:get, key})
end
@impl true
def init(_) do
# Schedule cleanup every minute
:timer.send_interval(60_000, :cleanup)
{:ok, %{}}
end
@impl true
def handle_cast({:put, key, value, ttl}, cache) do
expires_at = System.system_time(:millisecond) + ttl
{:noreply, Map.put(cache, key, {value, expires_at})}
end
@impl true
def handle_call({:get, key}, _from, cache) do
now = System.system_time(:millisecond)
case Map.get(cache, key) do
{value, expires_at} when expires_at > now -> {:reply, {:ok, value}, cache}
_ -> {:reply, :not_found, cache}
end
end
@impl true
def handle_info(:cleanup, cache) do
now = System.system_time(:millisecond)
clean_cache = cache |> Enum.reject(fn {_key, {_value, expires_at}} -> expires_at <= now end) |> Map.new()
{:noreply, clean_cache}
end
endSupervisor: Process Supervision
Supervisor:进程监督
When to use Supervisor:
- ALWAYS supervise your processes (GenServers, Tasks)
- Need fault tolerance and automatic restart
- Managing a group of related processes
Basic Supervision Strategies:
1. (Most Common - 90% of cases)
:one_for_oneelixir
defmodule MyApp.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
MyApp.Counter,
MyApp.Cache,
{MyApp.Worker, [arg1: "value"]}
]
# If one child crashes, restart only that child
Supervisor.init(children, strategy: :one_for_one)
end
end2. (When Order Matters)
:rest_for_oneelixir
defmodule MyApp.DatabaseSupervisor do
use Supervisor
@impl true
def init(_opts) do
children = [
MyApp.Repo, # Database connection
MyApp.Cache, # Depends on DB
MyApp.DataProcessor # Depends on both
]
# If DataProcessor crashes, restart it only
# If Cache crashes, restart Cache AND DataProcessor
# If Repo crashes, restart ALL children
Supervisor.init(children, strategy: :rest_for_one)
end
end3. (All or Nothing)
:one_for_allelixir
defmodule MyApp.ClusterSupervisor do
use Supervisor
@impl true
def init(_opts) do
children = [
MyApp.ClusterNode1,
MyApp.ClusterNode2,
MyApp.ClusterCoordinator
]
# If ANY child crashes, restart ALL children
# Use when children are tightly coupled
Supervisor.init(children, strategy: :one_for_all)
end
end何时使用Supervisor:
- 始终要监督你的进程(GenServer、Task等)
- 需要容错能力和自动重启机制
- 管理一组相关进程
基础监督策略:
1. (最常用 - 90%的场景)
:one_for_oneelixir
defmodule MyApp.Supervisor do
use Supervisor
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
MyApp.Counter,
MyApp.Cache,
{MyApp.Worker, [arg1: "value"]}
]
# 如果一个子进程崩溃,仅重启该子进程
Supervisor.init(children, strategy: :one_for_one)
end
end2. (当启动顺序很重要时)
:rest_for_oneelixir
defmodule MyApp.DatabaseSupervisor do
use Supervisor
@impl true
def init(_opts) do
children = [
MyApp.Repo, # 数据库连接
MyApp.Cache, # 依赖数据库
MyApp.DataProcessor # 依赖前两者
]
# 如果DataProcessor崩溃,仅重启它
# 如果Cache崩溃,重启Cache和DataProcessor
# 如果Repo崩溃,重启所有子进程
Supervisor.init(children, strategy: :rest_for_one)
end
end3. (全有或全无)
:one_for_allelixir
defmodule MyApp.ClusterSupervisor do
use Supervisor
@impl true
def init(_opts) do
children = [
MyApp.ClusterNode1,
MyApp.ClusterNode2,
MyApp.ClusterCoordinator
]
# 如果任何一个子进程崩溃,重启所有子进程
# 适用于子进程紧密耦合的场景
Supervisor.init(children, strategy: :one_for_all)
end
endTask: One-off Async Work
Task:一次性异步任务
When to use Task:
- One-off async operations
- Fire-and-forget work
- Parallel processing without state
Basic Task Patterns:
1. Simple Async/Await:
elixir
undefined何时使用Task:
- 一次性异步操作
- 无需结果的“即发即忘”任务
- 无状态的并行处理
基础Task模式:
1. 简单异步/等待:
elixir
undefinedFire and forget
即发即忘
Task.start(fn ->
send_email(user.email, "Welcome!")
end)
Task.start(fn ->
send_email(user.email, "Welcome!")
end)
Async with result
带结果返回的异步操作
task = Task.async(fn ->
expensive_calculation(data)
end)
result = Task.await(task, 10_000) # 10 second timeout
**2. Supervised Tasks:**
```elixir
defmodule MyApp.TaskSupervisor do
use Task.Supervisor
def start_link(_opts) do
Task.Supervisor.start_link(name: __MODULE__)
end
endtask = Task.async(fn ->
expensive_calculation(data)
end)
result = Task.await(task, 10_000) # 10秒超时
**2. 受监督的Task:**
```elixir
defmodule MyApp.TaskSupervisor do
use Task.Supervisor
def start_link(_opts) do
Task.Supervisor.start_link(name: __MODULE__)
end
endIn your application supervisor
在应用的Supervisor中
children = [
MyApp.TaskSupervisor,
other children...
]
children = [
MyApp.TaskSupervisor,
其他子进程...
]
Use supervised tasks
使用受监督的Task
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
process_large_file(file_path)
end)
**3. Parallel Processing:**
```elixir
defmodule MyApp.BatchProcessor do
def process_batch(items) do
items
|> Task.async_stream(&process_item/1, max_concurrency: 10, timeout: 30_000)
|> Enum.to_list()
end
defp process_item(item) do
# Process individual item
{:ok, processed_item}
end
endTask.Supervisor.start_child(MyApp.TaskSupervisor, fn ->
process_large_file(file_path)
end)
**3. 并行处理:**
```elixir
defmodule MyApp.BatchProcessor do
def process_batch(items) do
items
|> Task.async_stream(&process_item/1, max_concurrency: 10, timeout: 30_000)
|> Enum.to_list()
end
defp process_item(item) do
# 处理单个条目
{:ok, processed_item}
end
endAgent: Simple Shared State
Agent:简单共享状态
When to use Agent:
- Simple shared state (get/update operations)
- No complex logic or async operations
- No need for custom message handling
Agent vs GenServer Decision:
- Use Agent: Simple key-value store, counters, flags
- Use GenServer: Complex state logic, async operations, custom messages
elixir
defmodule MyApp.Settings do
use Agent
def start_link(initial_settings \\ %{}) do
Agent.start_link(fn -> initial_settings end, name: __MODULE__)
end
def get(key) do
Agent.get(__MODULE__, &Map.get(&1, key))
end
def put(key, value) do
Agent.update(__MODULE__, &Map.put(&1, key, value))
end
def get_all do
Agent.get(__MODULE__, & &1)
end
end何时使用Agent:
- 简单的共享状态(获取/更新操作)
- 无复杂逻辑或异步操作
- 无需自定义消息处理
Agent与GenServer的选择:
- 使用Agent:简单键值存储、计数器、标志位
- 使用GenServer:复杂状态逻辑、异步操作、自定义消息
elixir
defmodule MyApp.Settings do
use Agent
def start_link(initial_settings \\ %{}) do
Agent.start_link(fn -> initial_settings end, name: __MODULE__)
end
def get(key) do
Agent.get(__MODULE__, &Map.get(&1, key))
end
def put(key, value) do
Agent.update(__MODULE__, &Map.put(&1, key, value))
end
def get_all do
Agent.get(__MODULE__, & &1)
end
endProcess Communication
进程通信
Message Passing Patterns
消息传递模式
1. Direct Send/Receive:
elixir
defmodule MyApp.DirectMessaging do
def start_receiver do
spawn(fn -> message_loop() end)
end
defp message_loop do
receive do
{:hello, sender_pid} ->
send(sender_pid, {:reply, "Hello back!"})
message_loop()
:stop ->
:ok
_ ->
message_loop()
end
end
def send_message(receiver_pid, message) do
send(receiver_pid, message)
receive do
{:reply, response} -> response
after
5000 -> :timeout
end
end
end2. GenServer Calls (Preferred):
elixir
undefined1. 直接发送/接收:
elixir
defmodule MyApp.DirectMessaging do
def start_receiver do
spawn(fn -> message_loop() end)
end
defp message_loop do
receive do
{:hello, sender_pid} ->
send(sender_pid, {:reply, "Hello back!"})
message_loop()
:stop ->
:ok
_ ->
message_loop()
end
end
def send_message(receiver_pid, message) do
send(receiver_pid, message)
receive do
{:reply, response} -> response
after
5000 -> :timeout
end
end
end2. GenServer调用(推荐):
elixir
undefinedSynchronous communication
同步通信
result = GenServer.call(MyWorker, {:process, data})
result = GenServer.call(MyWorker, {:process, data})
Asynchronous communication
异步通信
GenServer.cast(MyWorker, {:update_state, new_data})
undefinedGenServer.cast(MyWorker, {:update_state, new_data})
undefinedProcess Monitoring and Linking
进程监控与链接
1. Process Monitoring:
elixir
defmodule MyApp.ProcessMonitor do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def monitor_process(pid) do
GenServer.cast(__MODULE__, {:monitor, pid})
end
@impl true
def handle_cast({:monitor, pid}, monitored) do
ref = Process.monitor(pid)
{:noreply, Map.put(monitored, ref, pid)}
end
@impl true
def handle_info({:DOWN, ref, :process, pid, reason}, monitored) do
IO.puts("Process #{inspect(pid)} died with reason: #{inspect(reason)}")
{:noreply, Map.delete(monitored, ref)}
end
end2. Process Linking (Use with Caution):
elixir
undefined1. 进程监控:
elixir
defmodule MyApp.ProcessMonitor do
use GenServer
def start_link(_opts) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
def monitor_process(pid) do
GenServer.cast(__MODULE__, {:monitor, pid})
end
@impl true
def handle_cast({:monitor, pid}, monitored) do
ref = Process.monitor(pid)
{:noreply, Map.put(monitored, ref, pid)}
end
@impl true
def handle_info({:DOWN, ref, :process, pid, reason}, monitored) do
IO.puts("Process #{inspect(pid)} died with reason: #{inspect(reason)}")
{:noreply, Map.delete(monitored, ref)}
end
end2. 进程链接(谨慎使用):
elixir
undefinedBidirectional link - if one dies, both die
双向链接 - 如果一个进程终止,另一个也会终止
Process.link(other_pid)
Process.link(other_pid)
Better: Use supervision for managed restarts
更好的方式:使用监督机制来管理重启
undefinedundefinedAdvanced OTP Patterns (Use When Needed)
高级OTP模式(按需使用)
These patterns solve specific scaling problems. Don't use unless you have measured the need.
这些模式用于解决特定的扩展性问题。除非确实有需求,否则不要使用。
DynamicSupervisor: Runtime Child Spawning
DynamicSupervisor:运行时子进程生成
When you outgrow static supervision:
- Need to spawn processes at runtime
- Don't know number of processes at startup
- Managing user sessions, connections
elixir
defmodule MyApp.SessionSupervisor do
use DynamicSupervisor
def start_link(_opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_opts) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_session(user_id) do
spec = {MyApp.UserSession, user_id}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def stop_session(user_id) do
case Registry.lookup(MyApp.SessionRegistry, user_id) do
[{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
[] -> :ok
end
end
end当静态监督无法满足需求时:
- 需要在运行时生成进程
- 启动时不知道进程数量
- 管理用户会话、连接等
elixir
defmodule MyApp.SessionSupervisor do
use DynamicSupervisor
def start_link(_opts) do
DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_opts) do
DynamicSupervisor.init(strategy: :one_for_one)
end
def start_session(user_id) do
spec = {MyApp.UserSession, user_id}
DynamicSupervisor.start_child(__MODULE__, spec)
end
def stop_session(user_id) do
case Registry.lookup(MyApp.SessionRegistry, user_id) do
[{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
[] -> :ok
end
end
endPartitionSupervisor: Reducing GenServer Bottlenecks
PartitionSupervisor:缓解GenServer瓶颈
When single GenServer becomes bottleneck:
- High-frequency operations
- Contention on single process
- Need to distribute load
elixir
defmodule MyApp.CounterSupervisor do
use PartitionSupervisor
def start_link(_opts) do
PartitionSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_opts) do
PartitionSupervisor.init(MyApp.Counter, strategy: :one_for_one)
end
def increment(key) do
PartitionSupervisor.get_child_pid(__MODULE__, key)
|> MyApp.Counter.increment()
end
end当单个GenServer成为瓶颈时:
- 高频率操作
- 单个进程存在竞争
- 需要分散负载
elixir
defmodule MyApp.CounterSupervisor do
use PartitionSupervisor
def start_link(_opts) do
PartitionSupervisor.start_link(__MODULE__, [], name: __MODULE__)
end
@impl true
def init(_opts) do
PartitionSupervisor.init(MyApp.Counter, strategy: :one_for_one)
end
def increment(key) do
PartitionSupervisor.get_child_pid(__MODULE__, key)
|> MyApp.Counter.increment()
end
endRegistry: Process Discovery
Registry:进程发现
When you have many named processes:
- Dynamic process naming
- Process lookups by custom keys
- Avoiding atom exhaustion
elixir
defmodule MyApp.GameRegistry do
def start_link do
Registry.start_link(keys: :unique, name: __MODULE__)
end
def register_game(game_id, pid) do
Registry.register(__MODULE__, game_id, pid)
end
def find_game(game_id) do
case Registry.lookup(__MODULE__, game_id) do
[{pid, _}] -> {:ok, pid}
[] -> {:error, :not_found}
end
end
def list_games do
Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
end
end当你有多个命名进程时:
- 动态进程命名
- 通过自定义键查找进程
- 避免原子耗尽
elixir
defmodule MyApp.GameRegistry do
def start_link do
Registry.start_link(keys: :unique, name: __MODULE__)
end
def register_game(game_id, pid) do
Registry.register(__MODULE__, game_id, pid)
end
def find_game(game_id) do
case Registry.lookup(__MODULE__, game_id) do
[{pid, _}] -> {:ok, pid}
[] -> {:error, :not_found}
end
end
def list_games do
Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
end
endOTP Design Principles (KISS Emphasis)
OTP设计原则(强调KISS)
When to Use What (KISS Guide)
工具选择指南(KISS版)
Start Simple (80% of cases):
Need state? → GenServer
Need supervision? → Supervisor (one_for_one)
Need one-off async? → Task
Need simple shared state? → AgentAdvance When Needed (specific problems only):
Spawning processes at runtime? → DynamicSupervisor
GenServer bottleneck (proven via profiling)? → PartitionSupervisor
Many named processes? → Registry
Need supervised async tasks? → Task.Supervisor从简单方案开始(80%的场景):
需要状态? → GenServer
需要监督? → Supervisor(one_for_one)
需要一次性异步任务? → Task
需要简单共享状态? → Agent按需升级(仅针对特定问题):
需要在运行时生成进程? → DynamicSupervisor
GenServer出现瓶颈(经性能分析确认)? → PartitionSupervisor
有大量命名进程? → Registry
需要受监督的异步任务? → Task.SupervisorRed Flags (Premature Optimization)
预警信号(过早优化)
Don't do these without measuring:
- Using PartitionSupervisor "just in case"
- Registry when you have < 100 processes
- Complex supervision trees without proven need
- DynamicSupervisor when static children work fine
未经性能分析不要做这些:
- 为了“以防万一”使用PartitionSupervisor
- 进程数量少于100时使用Registry
- 在没有明确需求的情况下构建复杂监督树
- 静态子进程能正常工作时使用DynamicSupervisor
Decision Framework
决策框架
1. Start with simplest pattern that works
elixir
undefined1. 从能解决问题的最简单模式开始
elixir
undefinedStart here
从这里开始
defmodule MyApp.SimpleCounter do
use GenServer
Basic implementation
end
defmodule MyApp.SimpleCounter do
use GenServer
基础实现
end
Not here
不要直接用这个
defmodule MyApp.DistributedPartitionedCounter do
use PartitionSupervisor
Complex implementation you don't need yet
end
**2. Profile to find actual bottlenecks**
```elixirdefmodule MyApp.DistributedPartitionedCounter do
use PartitionSupervisor
你目前不需要的复杂实现
end
**2. 通过性能分析找到实际瓶颈**
```elixirUse Observer to find real problems
使用Observer查找真正的问题
:observer.start()
:observer.start()
Use telemetry for metrics
使用telemetry收集指标
:telemetry.execute([:my_app, :counter, :increment], %{value: 1})
**3. Optimize specific problem with specific solution**
```elixir:telemetry.execute([:my_app, :counter, :increment], %{value: 1})
**3. 针对特定问题使用优化方案**
```elixirIf profiling shows GenServer is bottleneck:
如果性能分析显示GenServer是瓶颈:
THEN consider PartitionSupervisor
再考虑使用PartitionSupervisor
If you're running out of atoms for process names:
如果进程命名导致原子耗尽:
THEN consider Registry
再考虑使用Registry
If you need runtime process spawning:
如果需要在运行时生成进程:
THEN consider DynamicSupervisor
再考虑使用DynamicSupervisor
**4. Measure improvement, iterate**
**4. 衡量改进效果,持续迭代**OTP Testing Patterns
OTP测试模式
Testing GenServers
测试GenServer
elixir
defmodule MyApp.CounterTest do
use ExUnit.Case
setup do
{:ok, pid} = MyApp.Counter.start_link(0)
%{counter: pid}
end
test "increments count", %{counter: pid} do
assert MyApp.Counter.get_count(pid) == 0
MyApp.Counter.increment(pid)
assert MyApp.Counter.get_count(pid) == 1
end
test "handles concurrent increments" do
{:ok, pid} = MyApp.Counter.start_link(0)
tasks = for _ <- 1..100 do
Task.async(fn -> MyApp.Counter.increment(pid) end)
end
Enum.each(tasks, &Task.await/1)
assert MyApp.Counter.get_count(pid) == 100
end
endelixir
defmodule MyApp.CounterTest do
use ExUnit.Case
setup do
{:ok, pid} = MyApp.Counter.start_link(0)
%{counter: pid}
end
test "increments count", %{counter: pid} do
assert MyApp.Counter.get_count(pid) == 0
MyApp.Counter.increment(pid)
assert MyApp.Counter.get_count(pid) == 1
end
test "handles concurrent increments" do
{:ok, pid} = MyApp.Counter.start_link(0)
tasks = for _ <- 1..100 do
Task.async(fn -> MyApp.Counter.increment(pid) end)
end
Enum.each(tasks, &Task.await/1)
assert MyApp.Counter.get_count(pid) == 100
end
endTesting Supervision Trees
测试监督树
elixir
defmodule MyApp.SupervisorTest do
use ExUnit.Case
test "restarts failed children" do
{:ok, supervisor_pid} = MyApp.Supervisor.start_link([])
# Find counter process
[{counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
# Kill the process
Process.exit(counter_pid, :kill)
# Wait for restart
:timer.sleep(100)
# Verify new process is running
[{new_counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
assert new_counter_pid != counter_pid
assert Process.alive?(new_counter_pid)
end
endelixir
defmodule MyApp.SupervisorTest do
use ExUnit.Case
test "restarts failed children" do
{:ok, supervisor_pid} = MyApp.Supervisor.start_link([])
# 查找计数器进程
[{counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
# 终止进程
Process.exit(counter_pid, :kill)
# 等待重启完成
:timer.sleep(100)
# 验证新进程正在运行
[{new_counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
assert new_counter_pid != counter_pid
assert Process.alive?(new_counter_pid)
end
endCross-References
交叉引用
System Architecture
系统架构
For high-level system design, context boundaries, and distributed architecture patterns,
see the skill.
elixir-architecture有关高级系统设计、上下文边界和分布式架构模式,请查看技能。
elixir-architecturePhoenix Integration
Phoenix集成
For integrating OTP patterns with Phoenix contexts, LiveView, and PubSub systems,
see the skill.
elixir-phoenix-framework有关将OTP模式与Phoenix上下文、LiveView和PubSub系统集成的内容,请查看技能。
elixir-phoenix-frameworkPerformance Review
性能评估
For profiling OTP applications, identifying process bottlenecks, and optimization strategies,
see the skill.
elixir-review有关OTP应用性能分析、进程瓶颈识别和优化策略,请查看技能。
elixir-reviewSummary
总结
Remember KISS:
- Start simple: GenServer + Supervisor covers most needs
- Profile before optimizing: Measure actual problems, not theoretical ones
- Advance when needed: Use complex patterns to solve specific measured problems
- Test concurrent behavior: OTP makes concurrency easier but testing is still important
The BEAM VM and OTP give you powerful tools for building fault-tolerant, concurrent systems. Use them wisely by starting simple and evolving complexity only when requirements demand it.
牢记KISS原则:
- 从简单开始:GenServer + Supervisor能满足大多数需求
- 先分析再优化:针对实际存在的问题进行优化,而非理论问题
- 按需升级:使用复杂模式解决经过验证的特定问题
- 测试并发行为:OTP简化了并发开发,但测试仍然很重要
BEAM虚拟机和OTP为你提供了构建容错、并发系统的强大工具。明智地使用它们:从简单方案开始,仅在需求明确时再增加复杂度。