elixir-otp

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Elixir OTP Skill

Elixir OTP 技能

Essential OTP Patterns (KISS First)

核心OTP模式(优先遵循KISS原则)

Use these patterns for 80% of use cases. Start simple, optimize later.
这些模式适用于80%的使用场景。从简单方案开始,后续再优化。

GenServer: Stateful Processes

GenServer:有状态进程

When to use GenServer:
  • Need to maintain state across function calls
  • State needs to be shared across processes
  • Need to serialize access to a resource
  • Long-running processes with periodic tasks
Basic GenServer Pattern:
elixir
defmodule MyApp.Counter do
  use GenServer
  
  # Client API (public interface)
  def start_link(initial_value \\ 0) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end
  
  def get_count do
    GenServer.call(__MODULE__, :get_count)
  end
  
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end
  
  def increment_by(amount) do
    GenServer.call(__MODULE__, {:increment_by, amount})
  end
  
  # Server Callbacks (private implementation)
  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end
  
  @impl true
  def handle_call(:get_count, _from, count) do
    {:reply, count, count}
  end
  
  @impl true
  def handle_call({:increment_by, amount}, _from, count) do
    new_count = count + amount
    {:reply, new_count, new_count}
  end
  
  @impl true
  def handle_cast(:increment, count) do
    {:noreply, count + 1}
  end
end
Common GenServer Patterns:
1. Resource Pool:
elixir
defmodule MyApp.ConnectionPool do
  use GenServer
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def checkout do
    GenServer.call(__MODULE__, :checkout)
  end
  
  def checkin(connection) do
    GenServer.cast(__MODULE__, {:checkin, connection})
  end
  
  @impl true
  def init(opts) do
    pool_size = Keyword.get(opts, :pool_size, 10)
    connections = Enum.map(1..pool_size, fn _ -> create_connection() end)
    {:ok, %{available: connections, in_use: []}}
  end
  
  @impl true
  def handle_call(:checkout, from, %{available: [conn | rest]} = state) do
    monitor_ref = Process.monitor(from)
    new_state = %{
      state | 
      available: rest, 
      in_use: [{conn, from, monitor_ref} | state.in_use]
    }
    {:reply, {:ok, conn}, new_state}
  end
  
  def handle_call(:checkout, _from, %{available: []} = state) do
    {:reply, {:error, :no_connections}, state}
  end
end
2. Cache with TTL:
elixir
defmodule MyApp.Cache do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def put(key, value, ttl \\ 60_000) do
    GenServer.cast(__MODULE__, {:put, key, value, ttl})
  end
  
  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end
  
  @impl true
  def init(_) do
    # Schedule cleanup every minute
    :timer.send_interval(60_000, :cleanup)
    {:ok, %{}}
  end
  
  @impl true
  def handle_cast({:put, key, value, ttl}, cache) do
    expires_at = System.system_time(:millisecond) + ttl
    {:noreply, Map.put(cache, key, {value, expires_at})}
  end
  
  @impl true
  def handle_call({:get, key}, _from, cache) do
    now = System.system_time(:millisecond)
    case Map.get(cache, key) do
      {value, expires_at} when expires_at > now -> {:reply, {:ok, value}, cache}
      _ -> {:reply, :not_found, cache}
    end
  end
  
  @impl true
  def handle_info(:cleanup, cache) do
    now = System.system_time(:millisecond)
    clean_cache = cache |> Enum.reject(fn {_key, {_value, expires_at}} -> expires_at <= now end) |> Map.new()
    {:noreply, clean_cache}
  end
end
何时使用GenServer:
  • 需要在函数调用之间维护状态
  • 状态需要在多个进程之间共享
  • 需要对资源进行序列化访问
  • 带有周期性任务的长期运行进程
基础GenServer模式:
elixir
defmodule MyApp.Counter do
  use GenServer
  
  # Client API (public interface)
  def start_link(initial_value \\ 0) do
    GenServer.start_link(__MODULE__, initial_value, name: __MODULE__)
  end
  
  def get_count do
    GenServer.call(__MODULE__, :get_count)
  end
  
  def increment do
    GenServer.cast(__MODULE__, :increment)
  end
  
  def increment_by(amount) do
    GenServer.call(__MODULE__, {:increment_by, amount})
  end
  
  # Server Callbacks (private implementation)
  @impl true
  def init(initial_value) do
    {:ok, initial_value}
  end
  
  @impl true
  def handle_call(:get_count, _from, count) do
    {:reply, count, count}
  end
  
  @impl true
  def handle_call({:increment_by, amount}, _from, count) do
    new_count = count + amount
    {:reply, new_count, new_count}
  end
  
  @impl true
  def handle_cast(:increment, count) do
    {:noreply, count + 1}
  end
end
常见GenServer模式:
1. 资源池:
elixir
defmodule MyApp.ConnectionPool do
  use GenServer
  
  def start_link(opts) do
    GenServer.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  def checkout do
    GenServer.call(__MODULE__, :checkout)
  end
  
  def checkin(connection) do
    GenServer.cast(__MODULE__, {:checkin, connection})
  end
  
  @impl true
  def init(opts) do
    pool_size = Keyword.get(opts, :pool_size, 10)
    connections = Enum.map(1..pool_size, fn _ -> create_connection() end)
    {:ok, %{available: connections, in_use: []}}
  end
  
  @impl true
  def handle_call(:checkout, from, %{available: [conn | rest]} = state) do
    monitor_ref = Process.monitor(from)
    new_state = %{
      state | 
      available: rest, 
      in_use: [{conn, from, monitor_ref} | state.in_use]
    }
    {:reply, {:ok, conn}, new_state}
  end
  
  def handle_call(:checkout, _from, %{available: []} = state) do
    {:reply, {:error, :no_connections}, state}
  end
end
2. 带TTL的缓存:
elixir
defmodule MyApp.Cache do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def put(key, value, ttl \\ 60_000) do
    GenServer.cast(__MODULE__, {:put, key, value, ttl})
  end
  
  def get(key) do
    GenServer.call(__MODULE__, {:get, key})
  end
  
  @impl true
  def init(_) do
    # Schedule cleanup every minute
    :timer.send_interval(60_000, :cleanup)
    {:ok, %{}}
  end
  
  @impl true
  def handle_cast({:put, key, value, ttl}, cache) do
    expires_at = System.system_time(:millisecond) + ttl
    {:noreply, Map.put(cache, key, {value, expires_at})}
  end
  
  @impl true
  def handle_call({:get, key}, _from, cache) do
    now = System.system_time(:millisecond)
    case Map.get(cache, key) do
      {value, expires_at} when expires_at > now -> {:reply, {:ok, value}, cache}
      _ -> {:reply, :not_found, cache}
    end
  end
  
  @impl true
  def handle_info(:cleanup, cache) do
    now = System.system_time(:millisecond)
    clean_cache = cache |> Enum.reject(fn {_key, {_value, expires_at}} -> expires_at <= now end) |> Map.new()
    {:noreply, clean_cache}
  end
end

Supervisor: Process Supervision

Supervisor:进程监督

When to use Supervisor:
  • ALWAYS supervise your processes (GenServers, Tasks)
  • Need fault tolerance and automatic restart
  • Managing a group of related processes
Basic Supervision Strategies:
1.
:one_for_one
(Most Common - 90% of cases)
elixir
defmodule MyApp.Supervisor do
  use Supervisor
  
  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Counter,
      MyApp.Cache,
      {MyApp.Worker, [arg1: "value"]}
    ]
    
    # If one child crashes, restart only that child
    Supervisor.init(children, strategy: :one_for_one)
  end
end
2.
:rest_for_one
(When Order Matters)
elixir
defmodule MyApp.DatabaseSupervisor do
  use Supervisor
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Repo,           # Database connection
      MyApp.Cache,          # Depends on DB
      MyApp.DataProcessor   # Depends on both
    ]
    
    # If DataProcessor crashes, restart it only
    # If Cache crashes, restart Cache AND DataProcessor
    # If Repo crashes, restart ALL children
    Supervisor.init(children, strategy: :rest_for_one)
  end
end
3.
:one_for_all
(All or Nothing)
elixir
defmodule MyApp.ClusterSupervisor do
  use Supervisor
  
  @impl true  
  def init(_opts) do
    children = [
      MyApp.ClusterNode1,
      MyApp.ClusterNode2,
      MyApp.ClusterCoordinator
    ]
    
    # If ANY child crashes, restart ALL children
    # Use when children are tightly coupled
    Supervisor.init(children, strategy: :one_for_all)
  end
end
何时使用Supervisor:
  • 始终要监督你的进程(GenServer、Task等)
  • 需要容错能力和自动重启机制
  • 管理一组相关进程
基础监督策略:
1.
:one_for_one
(最常用 - 90%的场景)
elixir
defmodule MyApp.Supervisor do
  use Supervisor
  
  def start_link(opts) do
    Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Counter,
      MyApp.Cache,
      {MyApp.Worker, [arg1: "value"]}
    ]
    
    # 如果一个子进程崩溃,仅重启该子进程
    Supervisor.init(children, strategy: :one_for_one)
  end
end
2.
:rest_for_one
(当启动顺序很重要时)
elixir
defmodule MyApp.DatabaseSupervisor do
  use Supervisor
  
  @impl true
  def init(_opts) do
    children = [
      MyApp.Repo,           # 数据库连接
      MyApp.Cache,          # 依赖数据库
      MyApp.DataProcessor   # 依赖前两者
    ]
    
    # 如果DataProcessor崩溃,仅重启它
    # 如果Cache崩溃,重启Cache和DataProcessor
    # 如果Repo崩溃,重启所有子进程
    Supervisor.init(children, strategy: :rest_for_one)
  end
end
3.
:one_for_all
(全有或全无)
elixir
defmodule MyApp.ClusterSupervisor do
  use Supervisor
  
  @impl true  
  def init(_opts) do
    children = [
      MyApp.ClusterNode1,
      MyApp.ClusterNode2,
      MyApp.ClusterCoordinator
    ]
    
    # 如果任何一个子进程崩溃,重启所有子进程
    # 适用于子进程紧密耦合的场景
    Supervisor.init(children, strategy: :one_for_all)
  end
end

Task: One-off Async Work

Task:一次性异步任务

When to use Task:
  • One-off async operations
  • Fire-and-forget work
  • Parallel processing without state
Basic Task Patterns:
1. Simple Async/Await:
elixir
undefined
何时使用Task:
  • 一次性异步操作
  • 无需结果的“即发即忘”任务
  • 无状态的并行处理
基础Task模式:
1. 简单异步/等待:
elixir
undefined

Fire and forget

即发即忘

Task.start(fn -> send_email(user.email, "Welcome!") end)
Task.start(fn -> send_email(user.email, "Welcome!") end)

Async with result

带结果返回的异步操作

task = Task.async(fn -> expensive_calculation(data) end)
result = Task.await(task, 10_000) # 10 second timeout

**2. Supervised Tasks:**
```elixir
defmodule MyApp.TaskSupervisor do
  use Task.Supervisor
  
  def start_link(_opts) do
    Task.Supervisor.start_link(name: __MODULE__)
  end
end
task = Task.async(fn -> expensive_calculation(data) end)
result = Task.await(task, 10_000) # 10秒超时

**2. 受监督的Task:**
```elixir
defmodule MyApp.TaskSupervisor do
  use Task.Supervisor
  
  def start_link(_opts) do
    Task.Supervisor.start_link(name: __MODULE__)
  end
end

In your application supervisor

在应用的Supervisor中

children = [ MyApp.TaskSupervisor,

other children...

]
children = [ MyApp.TaskSupervisor,

其他子进程...

]

Use supervised tasks

使用受监督的Task

Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> process_large_file(file_path) end)

**3. Parallel Processing:**
```elixir
defmodule MyApp.BatchProcessor do
  def process_batch(items) do
    items
    |> Task.async_stream(&process_item/1, max_concurrency: 10, timeout: 30_000)
    |> Enum.to_list()
  end
  
  defp process_item(item) do
    # Process individual item
    {:ok, processed_item}
  end
end
Task.Supervisor.start_child(MyApp.TaskSupervisor, fn -> process_large_file(file_path) end)

**3. 并行处理:**
```elixir
defmodule MyApp.BatchProcessor do
  def process_batch(items) do
    items
    |> Task.async_stream(&process_item/1, max_concurrency: 10, timeout: 30_000)
    |> Enum.to_list()
  end
  
  defp process_item(item) do
    # 处理单个条目
    {:ok, processed_item}
  end
end

Agent: Simple Shared State

Agent:简单共享状态

When to use Agent:
  • Simple shared state (get/update operations)
  • No complex logic or async operations
  • No need for custom message handling
Agent vs GenServer Decision:
  • Use Agent: Simple key-value store, counters, flags
  • Use GenServer: Complex state logic, async operations, custom messages
elixir
defmodule MyApp.Settings do
  use Agent
  
  def start_link(initial_settings \\ %{}) do
    Agent.start_link(fn -> initial_settings end, name: __MODULE__)
  end
  
  def get(key) do
    Agent.get(__MODULE__, &Map.get(&1, key))
  end
  
  def put(key, value) do
    Agent.update(__MODULE__, &Map.put(&1, key, value))
  end
  
  def get_all do
    Agent.get(__MODULE__, & &1)
  end
end
何时使用Agent:
  • 简单的共享状态(获取/更新操作)
  • 无复杂逻辑或异步操作
  • 无需自定义消息处理
Agent与GenServer的选择:
  • 使用Agent:简单键值存储、计数器、标志位
  • 使用GenServer:复杂状态逻辑、异步操作、自定义消息
elixir
defmodule MyApp.Settings do
  use Agent
  
  def start_link(initial_settings \\ %{}) do
    Agent.start_link(fn -> initial_settings end, name: __MODULE__)
  end
  
  def get(key) do
    Agent.get(__MODULE__, &Map.get(&1, key))
  end
  
  def put(key, value) do
    Agent.update(__MODULE__, &Map.put(&1, key, value))
  end
  
  def get_all do
    Agent.get(__MODULE__, & &1)
  end
end

Process Communication

进程通信

Message Passing Patterns

消息传递模式

1. Direct Send/Receive:
elixir
defmodule MyApp.DirectMessaging do
  def start_receiver do
    spawn(fn -> message_loop() end)
  end
  
  defp message_loop do
    receive do
      {:hello, sender_pid} -> 
        send(sender_pid, {:reply, "Hello back!"})
        message_loop()
      :stop -> 
        :ok
      _ -> 
        message_loop()
    end
  end
  
  def send_message(receiver_pid, message) do
    send(receiver_pid, message)
    
    receive do
      {:reply, response} -> response
    after
      5000 -> :timeout
    end
  end
end
2. GenServer Calls (Preferred):
elixir
undefined
1. 直接发送/接收:
elixir
defmodule MyApp.DirectMessaging do
  def start_receiver do
    spawn(fn -> message_loop() end)
  end
  
  defp message_loop do
    receive do
      {:hello, sender_pid} -> 
        send(sender_pid, {:reply, "Hello back!"})
        message_loop()
      :stop -> 
        :ok
      _ -> 
        message_loop()
    end
  end
  
  def send_message(receiver_pid, message) do
    send(receiver_pid, message)
    
    receive do
      {:reply, response} -> response
    after
      5000 -> :timeout
    end
  end
end
2. GenServer调用(推荐):
elixir
undefined

Synchronous communication

同步通信

result = GenServer.call(MyWorker, {:process, data})
result = GenServer.call(MyWorker, {:process, data})

Asynchronous communication

异步通信

GenServer.cast(MyWorker, {:update_state, new_data})
undefined
GenServer.cast(MyWorker, {:update_state, new_data})
undefined

Process Monitoring and Linking

进程监控与链接

1. Process Monitoring:
elixir
defmodule MyApp.ProcessMonitor do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def monitor_process(pid) do
    GenServer.cast(__MODULE__, {:monitor, pid})
  end
  
  @impl true
  def handle_cast({:monitor, pid}, monitored) do
    ref = Process.monitor(pid)
    {:noreply, Map.put(monitored, ref, pid)}
  end
  
  @impl true
  def handle_info({:DOWN, ref, :process, pid, reason}, monitored) do
    IO.puts("Process #{inspect(pid)} died with reason: #{inspect(reason)}")
    {:noreply, Map.delete(monitored, ref)}
  end
end
2. Process Linking (Use with Caution):
elixir
undefined
1. 进程监控:
elixir
defmodule MyApp.ProcessMonitor do
  use GenServer
  
  def start_link(_opts) do
    GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
  end
  
  def monitor_process(pid) do
    GenServer.cast(__MODULE__, {:monitor, pid})
  end
  
  @impl true
  def handle_cast({:monitor, pid}, monitored) do
    ref = Process.monitor(pid)
    {:noreply, Map.put(monitored, ref, pid)}
  end
  
  @impl true
  def handle_info({:DOWN, ref, :process, pid, reason}, monitored) do
    IO.puts("Process #{inspect(pid)} died with reason: #{inspect(reason)}")
    {:noreply, Map.delete(monitored, ref)}
  end
end
2. 进程链接(谨慎使用):
elixir
undefined

Bidirectional link - if one dies, both die

双向链接 - 如果一个进程终止,另一个也会终止

Process.link(other_pid)
Process.link(other_pid)

Better: Use supervision for managed restarts

更好的方式:使用监督机制来管理重启

undefined
undefined

Advanced OTP Patterns (Use When Needed)

高级OTP模式(按需使用)

These patterns solve specific scaling problems. Don't use unless you have measured the need.
这些模式用于解决特定的扩展性问题。除非确实有需求,否则不要使用。

DynamicSupervisor: Runtime Child Spawning

DynamicSupervisor:运行时子进程生成

When you outgrow static supervision:
  • Need to spawn processes at runtime
  • Don't know number of processes at startup
  • Managing user sessions, connections
elixir
defmodule MyApp.SessionSupervisor do
  use DynamicSupervisor
  
  def start_link(_opts) do
    DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
  
  def start_session(user_id) do
    spec = {MyApp.UserSession, user_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end
  
  def stop_session(user_id) do
    case Registry.lookup(MyApp.SessionRegistry, user_id) do
      [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
      [] -> :ok
    end
  end
end
当静态监督无法满足需求时:
  • 需要在运行时生成进程
  • 启动时不知道进程数量
  • 管理用户会话、连接等
elixir
defmodule MyApp.SessionSupervisor do
  use DynamicSupervisor
  
  def start_link(_opts) do
    DynamicSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    DynamicSupervisor.init(strategy: :one_for_one)
  end
  
  def start_session(user_id) do
    spec = {MyApp.UserSession, user_id}
    DynamicSupervisor.start_child(__MODULE__, spec)
  end
  
  def stop_session(user_id) do
    case Registry.lookup(MyApp.SessionRegistry, user_id) do
      [{pid, _}] -> DynamicSupervisor.terminate_child(__MODULE__, pid)
      [] -> :ok
    end
  end
end

PartitionSupervisor: Reducing GenServer Bottlenecks

PartitionSupervisor:缓解GenServer瓶颈

When single GenServer becomes bottleneck:
  • High-frequency operations
  • Contention on single process
  • Need to distribute load
elixir
defmodule MyApp.CounterSupervisor do
  use PartitionSupervisor
  
  def start_link(_opts) do
    PartitionSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    PartitionSupervisor.init(MyApp.Counter, strategy: :one_for_one)
  end
  
  def increment(key) do
    PartitionSupervisor.get_child_pid(__MODULE__, key)
    |> MyApp.Counter.increment()
  end
end
当单个GenServer成为瓶颈时:
  • 高频率操作
  • 单个进程存在竞争
  • 需要分散负载
elixir
defmodule MyApp.CounterSupervisor do
  use PartitionSupervisor
  
  def start_link(_opts) do
    PartitionSupervisor.start_link(__MODULE__, [], name: __MODULE__)
  end
  
  @impl true
  def init(_opts) do
    PartitionSupervisor.init(MyApp.Counter, strategy: :one_for_one)
  end
  
  def increment(key) do
    PartitionSupervisor.get_child_pid(__MODULE__, key)
    |> MyApp.Counter.increment()
  end
end

Registry: Process Discovery

Registry:进程发现

When you have many named processes:
  • Dynamic process naming
  • Process lookups by custom keys
  • Avoiding atom exhaustion
elixir
defmodule MyApp.GameRegistry do
  def start_link do
    Registry.start_link(keys: :unique, name: __MODULE__)
  end
  
  def register_game(game_id, pid) do
    Registry.register(__MODULE__, game_id, pid)
  end
  
  def find_game(game_id) do
    case Registry.lookup(__MODULE__, game_id) do
      [{pid, _}] -> {:ok, pid}
      [] -> {:error, :not_found}
    end
  end
  
  def list_games do
    Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
  end
end
当你有多个命名进程时:
  • 动态进程命名
  • 通过自定义键查找进程
  • 避免原子耗尽
elixir
defmodule MyApp.GameRegistry do
  def start_link do
    Registry.start_link(keys: :unique, name: __MODULE__)
  end
  
  def register_game(game_id, pid) do
    Registry.register(__MODULE__, game_id, pid)
  end
  
  def find_game(game_id) do
    case Registry.lookup(__MODULE__, game_id) do
      [{pid, _}] -> {:ok, pid}
      [] -> {:error, :not_found}
    end
  end
  
  def list_games do
    Registry.select(__MODULE__, [{{:"$1", :"$2", :"$3"}, [], [{{:"$1", :"$2"}}]}])
  end
end

OTP Design Principles (KISS Emphasis)

OTP设计原则(强调KISS)

When to Use What (KISS Guide)

工具选择指南(KISS版)

Start Simple (80% of cases):
Need state? → GenServer
Need supervision? → Supervisor (one_for_one)
Need one-off async? → Task
Need simple shared state? → Agent
Advance When Needed (specific problems only):
Spawning processes at runtime? → DynamicSupervisor
GenServer bottleneck (proven via profiling)? → PartitionSupervisor  
Many named processes? → Registry
Need supervised async tasks? → Task.Supervisor
从简单方案开始(80%的场景):
需要状态? → GenServer
需要监督? → Supervisor(one_for_one)
需要一次性异步任务? → Task
需要简单共享状态? → Agent
按需升级(仅针对特定问题):
需要在运行时生成进程? → DynamicSupervisor
GenServer出现瓶颈(经性能分析确认)? → PartitionSupervisor  
有大量命名进程? → Registry
需要受监督的异步任务? → Task.Supervisor

Red Flags (Premature Optimization)

预警信号(过早优化)

Don't do these without measuring:
  • Using PartitionSupervisor "just in case"
  • Registry when you have < 100 processes
  • Complex supervision trees without proven need
  • DynamicSupervisor when static children work fine
未经性能分析不要做这些:
  • 为了“以防万一”使用PartitionSupervisor
  • 进程数量少于100时使用Registry
  • 在没有明确需求的情况下构建复杂监督树
  • 静态子进程能正常工作时使用DynamicSupervisor

Decision Framework

决策框架

1. Start with simplest pattern that works
elixir
undefined
1. 从能解决问题的最简单模式开始
elixir
undefined

Start here

从这里开始

defmodule MyApp.SimpleCounter do use GenServer

Basic implementation

end
defmodule MyApp.SimpleCounter do use GenServer

基础实现

end

Not here

不要直接用这个

defmodule MyApp.DistributedPartitionedCounter do use PartitionSupervisor

Complex implementation you don't need yet

end

**2. Profile to find actual bottlenecks**
```elixir
defmodule MyApp.DistributedPartitionedCounter do use PartitionSupervisor

你目前不需要的复杂实现

end

**2. 通过性能分析找到实际瓶颈**
```elixir

Use Observer to find real problems

使用Observer查找真正的问题

:observer.start()
:observer.start()

Use telemetry for metrics

使用telemetry收集指标

:telemetry.execute([:my_app, :counter, :increment], %{value: 1})

**3. Optimize specific problem with specific solution**
```elixir
:telemetry.execute([:my_app, :counter, :increment], %{value: 1})

**3. 针对特定问题使用优化方案**
```elixir

If profiling shows GenServer is bottleneck:

如果性能分析显示GenServer是瓶颈:

THEN consider PartitionSupervisor

再考虑使用PartitionSupervisor

If you're running out of atoms for process names:

如果进程命名导致原子耗尽:

THEN consider Registry

再考虑使用Registry

If you need runtime process spawning:

如果需要在运行时生成进程:

THEN consider DynamicSupervisor

再考虑使用DynamicSupervisor


**4. Measure improvement, iterate**

**4. 衡量改进效果,持续迭代**

OTP Testing Patterns

OTP测试模式

Testing GenServers

测试GenServer

elixir
defmodule MyApp.CounterTest do
  use ExUnit.Case
  
  setup do
    {:ok, pid} = MyApp.Counter.start_link(0)
    %{counter: pid}
  end
  
  test "increments count", %{counter: pid} do
    assert MyApp.Counter.get_count(pid) == 0
    MyApp.Counter.increment(pid)
    assert MyApp.Counter.get_count(pid) == 1
  end
  
  test "handles concurrent increments" do
    {:ok, pid} = MyApp.Counter.start_link(0)
    
    tasks = for _ <- 1..100 do
      Task.async(fn -> MyApp.Counter.increment(pid) end)
    end
    
    Enum.each(tasks, &Task.await/1)
    assert MyApp.Counter.get_count(pid) == 100
  end
end
elixir
defmodule MyApp.CounterTest do
  use ExUnit.Case
  
  setup do
    {:ok, pid} = MyApp.Counter.start_link(0)
    %{counter: pid}
  end
  
  test "increments count", %{counter: pid} do
    assert MyApp.Counter.get_count(pid) == 0
    MyApp.Counter.increment(pid)
    assert MyApp.Counter.get_count(pid) == 1
  end
  
  test "handles concurrent increments" do
    {:ok, pid} = MyApp.Counter.start_link(0)
    
    tasks = for _ <- 1..100 do
      Task.async(fn -> MyApp.Counter.increment(pid) end)
    end
    
    Enum.each(tasks, &Task.await/1)
    assert MyApp.Counter.get_count(pid) == 100
  end
end

Testing Supervision Trees

测试监督树

elixir
defmodule MyApp.SupervisorTest do
  use ExUnit.Case
  
  test "restarts failed children" do
    {:ok, supervisor_pid} = MyApp.Supervisor.start_link([])
    
    # Find counter process
    [{counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    
    # Kill the process
    Process.exit(counter_pid, :kill)
    
    # Wait for restart
    :timer.sleep(100)
    
    # Verify new process is running
    [{new_counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    assert new_counter_pid != counter_pid
    assert Process.alive?(new_counter_pid)
  end
end
elixir
defmodule MyApp.SupervisorTest do
  use ExUnit.Case
  
  test "restarts failed children" do
    {:ok, supervisor_pid} = MyApp.Supervisor.start_link([])
    
    # 查找计数器进程
    [{counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    
    # 终止进程
    Process.exit(counter_pid, :kill)
    
    # 等待重启完成
    :timer.sleep(100)
    
    # 验证新进程正在运行
    [{new_counter_pid, _}] = Registry.lookup(MyApp.Registry, MyApp.Counter)
    assert new_counter_pid != counter_pid
    assert Process.alive?(new_counter_pid)
  end
end

Cross-References

交叉引用

System Architecture

系统架构

For high-level system design, context boundaries, and distributed architecture patterns, see the
elixir-architecture
skill.
有关高级系统设计、上下文边界和分布式架构模式,请查看
elixir-architecture
技能。

Phoenix Integration

Phoenix集成

For integrating OTP patterns with Phoenix contexts, LiveView, and PubSub systems, see the
elixir-phoenix-framework
skill.
有关将OTP模式与Phoenix上下文、LiveView和PubSub系统集成的内容,请查看
elixir-phoenix-framework
技能。

Performance Review

性能评估

For profiling OTP applications, identifying process bottlenecks, and optimization strategies, see the
elixir-review
skill.
有关OTP应用性能分析、进程瓶颈识别和优化策略,请查看
elixir-review
技能。

Summary

总结

Remember KISS:
  1. Start simple: GenServer + Supervisor covers most needs
  2. Profile before optimizing: Measure actual problems, not theoretical ones
  3. Advance when needed: Use complex patterns to solve specific measured problems
  4. Test concurrent behavior: OTP makes concurrency easier but testing is still important
The BEAM VM and OTP give you powerful tools for building fault-tolerant, concurrent systems. Use them wisely by starting simple and evolving complexity only when requirements demand it.
牢记KISS原则:
  1. 从简单开始:GenServer + Supervisor能满足大多数需求
  2. 先分析再优化:针对实际存在的问题进行优化,而非理论问题
  3. 按需升级:使用复杂模式解决经过验证的特定问题
  4. 测试并发行为:OTP简化了并发开发,但测试仍然很重要
BEAM虚拟机和OTP为你提供了构建容错、并发系统的强大工具。明智地使用它们:从简单方案开始,仅在需求明确时再增加复杂度。