elixir-ecto

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

Elixir Ecto Skill

Elixir Ecto 技能指南

Schema Design

Schema设计

Basic Schema Definition

基础Schema定义

Schema Structure:
elixir
defmodule MyApp.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset
  
  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  
  schema "users" do
    field :email, :string
    field :name, :string
    field :age, :integer
    field :is_active, :boolean, default: true
    field :password, :string, virtual: true
    field :password_hash, :string
    field :profile_data, :map
    
    has_many :posts, MyApp.Blog.Post
    has_one :profile, MyApp.Accounts.Profile
    many_to_many :roles, MyApp.Accounts.Role, join_through: "user_roles"
    
    timestamps()
  end
  
  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :name, :age, :password])
    |> validate_required([:email, :name])
    |> validate_format(:email, ~r/^[^\s]+@[^\s]+$/, message: "must have the @ sign and no spaces")
    |> validate_length(:name, min: 2, max: 100)
    |> validate_number(:age, greater_than: 0, less_than: 150)
    |> unique_constraint(:email)
    |> put_password_hash()
  end
  
  defp put_password_hash(%{valid?: true, changes: %{password: password}} = changeset) do
    put_change(changeset, :password_hash, Argon2.hash_pwd_salt(password))
  end
  defp put_password_hash(changeset), do: changeset
end
Schema结构:
elixir
defmodule MyApp.Accounts.User do
  use Ecto.Schema
  import Ecto.Changeset
  
  @primary_key {:id, :binary_id, autogenerate: true}
  @foreign_key_type :binary_id
  
  schema "users" do
    field :email, :string
    field :name, :string
    field :age, :integer
    field :is_active, :boolean, default: true
    field :password, :string, virtual: true
    field :password_hash, :string
    field :profile_data, :map
    
    has_many :posts, MyApp.Blog.Post
    has_one :profile, MyApp.Accounts.Profile
    many_to_many :roles, MyApp.Accounts.Role, join_through: "user_roles"
    
    timestamps()
  end
  
  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :name, :age, :password])
    |> validate_required([:email, :name])
    |> validate_format(:email, ~r/^[^\s]+@[^\s]+$/, message: "must have the @ sign and no spaces")
    |> validate_length(:name, min: 2, max: 100)
    |> validate_number(:age, greater_than: 0, less_than: 150)
    |> unique_constraint(:email)
    |> put_password_hash()
  end
  
  defp put_password_hash(%{valid?: true, changes: %{password: password}} = changeset) do
    put_change(changeset, :password_hash, Argon2.hash_pwd_salt(password))
  end
  defp put_password_hash(changeset), do: changeset
end

Virtual Fields and Computed Values

虚拟字段与计算值

Virtual Fields for Input Processing:
elixir
defmodule MyApp.Accounts.Profile do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "profiles" do
    field :first_name, :string
    field :last_name, :string
    field :full_name, :string, virtual: true
    field :birth_date, :date
    field :age, :integer, virtual: true
    
    belongs_to :user, MyApp.Accounts.User
    
    timestamps()
  end
  
  def changeset(profile, attrs) do
    profile
    |> cast(attrs, [:first_name, :last_name, :birth_date, :full_name])
    |> validate_required([:first_name, :last_name])
    |> put_full_name()
    |> put_age()
  end
  
  defp put_full_name(%{changes: %{first_name: first, last_name: last}} = changeset) do
    put_change(changeset, :full_name, "#{first} #{last}")
  end
  defp put_full_name(changeset), do: changeset
  
  defp put_age(%{changes: %{birth_date: birth_date}} = changeset) do
    age = Date.diff(Date.utc_today(), birth_date) |> div(365)
    put_change(changeset, :age, age)
  end
  defp put_age(changeset), do: changeset
end
用于输入处理的虚拟字段:
elixir
defmodule MyApp.Accounts.Profile do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "profiles" do
    field :first_name, :string
    field :last_name, :string
    field :full_name, :string, virtual: true
    field :birth_date, :date
    field :age, :integer, virtual: true
    
    belongs_to :user, MyApp.Accounts.User
    
    timestamps()
  end
  
  def changeset(profile, attrs) do
    profile
    |> cast(attrs, [:first_name, :last_name, :birth_date, :full_name])
    |> validate_required([:first_name, :last_name])
    |> put_full_name()
    |> put_age()
  end
  
  defp put_full_name(%{changes: %{first_name: first, last_name: last}} = changeset) do
    put_change(changeset, :full_name, "#{first} #{last}")
  end
  defp put_full_name(changeset), do: changeset
  
  defp put_age(%{changes: %{birth_date: birth_date}} = changeset) do
    age = Date.diff(Date.utc_today(), birth_date) |> div(365)
    put_change(changeset, :age, age)
  end
  defp put_age(changeset), do: changeset
end

Embedded Schemas

嵌入式Schema

For JSON/Map Fields:
elixir
defmodule MyApp.Accounts.Address do
  use Ecto.Schema
  import Ecto.Changeset
  
  @primary_key false
  embedded_schema do
    field :street, :string
    field :city, :string
    field :state, :string
    field :zip_code, :string
    field :country, :string, default: "US"
  end
  
  def changeset(address, attrs) do
    address
    |> cast(attrs, [:street, :city, :state, :zip_code, :country])
    |> validate_required([:street, :city, :state, :zip_code])
    |> validate_length(:zip_code, is: 5)
  end
end
适用于JSON/Map字段:
elixir
defmodule MyApp.Accounts.Address do
  use Ecto.Schema
  import Ecto.Changeset
  
  @primary_key false
  embedded_schema do
    field :street, :string
    field :city, :string
    field :state, :string
    field :zip_code, :string
    field :country, :string, default: "US"
  end
  
  def changeset(address, attrs) do
    address
    |> cast(attrs, [:street, :city, :state, :zip_code, :country])
    |> validate_required([:street, :city, :state, :zip_code])
    |> validate_length(:zip_code, is: 5)
  end
end

Usage in parent schema

在父Schema中使用

defmodule MyApp.Accounts.User do use Ecto.Schema import Ecto.Changeset
schema "users" do field :name, :string embeds_one :address, MyApp.Accounts.Address embeds_many :previous_addresses, MyApp.Accounts.Address
timestamps()
end
def changeset(user, attrs) do user |> cast(attrs, [:name]) |> cast_embed(:address) |> cast_embed(:previous_addresses) end end
undefined
defmodule MyApp.Accounts.User do use Ecto.Schema import Ecto.Changeset
schema "users" do field :name, :string embeds_one :address, MyApp.Accounts.Address embeds_many :previous_addresses, MyApp.Accounts.Address
timestamps()
end
def changeset(user, attrs) do user |> cast(attrs, [:name]) |> cast_embed(:address) |> cast_embed(:previous_addresses) end end
undefined

Associations

关联关系

Association Patterns:
elixir
undefined
关联模式:
elixir
undefined

One-to-many with dependent deletion

一对多关联及依赖删除

defmodule MyApp.Blog.Post do use Ecto.Schema
schema "posts" do field :title, :string field :body, :text field :published_at, :utc_datetime
belongs_to :author, MyApp.Accounts.User
has_many :comments, MyApp.Blog.Comment, on_delete: :delete_all
has_many :post_tags, MyApp.Blog.PostTag, on_delete: :delete_all
has_many :tags, through: [:post_tags, :tag]

timestamps()
end end
defmodule MyApp.Blog.Post do use Ecto.Schema
schema "posts" do field :title, :string field :body, :text field :published_at, :utc_datetime
belongs_to :author, MyApp.Accounts.User
has_many :comments, MyApp.Blog.Comment, on_delete: :delete_all
has_many :post_tags, MyApp.Blog.PostTag, on_delete: :delete_all
has_many :tags, through: [:post_tags, :tag]

timestamps()
end end

Many-to-many with join schema

多对多关联及连接Schema

defmodule MyApp.Blog.PostTag do use Ecto.Schema
schema "post_tags" do belongs_to :post, MyApp.Blog.Post belongs_to :tag, MyApp.Blog.Tag
field :created_by_id, :id

timestamps()
end end
defmodule MyApp.Blog.PostTag do use Ecto.Schema
schema "post_tags" do belongs_to :post, MyApp.Blog.Post belongs_to :tag, MyApp.Blog.Tag
field :created_by_id, :id

timestamps()
end end

Polymorphic associations

多态关联

defmodule MyApp.Comments.Comment do use Ecto.Schema
schema "comments" do field :body, :text field :commentable_type, :string field :commentable_id, :id
belongs_to :user, MyApp.Accounts.User

timestamps()
end
def for_post(query \ MODULE) do from c in query, where: c.commentable_type == "post" end
def for_user(query \ MODULE) do from c in query, where: c.commentable_type == "user" end end
undefined
defmodule MyApp.Comments.Comment do use Ecto.Schema
schema "comments" do field :body, :text field :commentable_type, :string field :commentable_id, :id
belongs_to :user, MyApp.Accounts.User

timestamps()
end
def for_post(query \ MODULE) do from c in query, where: c.commentable_type == "post" end
def for_user(query \ MODULE) do from c in query, where: c.commentable_type == "user" end end
undefined

Changesets & Validation

变更集与验证

Advanced Changeset Patterns

高级变更集模式

Conditional Validations:
elixir
defmodule MyApp.Accounts.User do
  import Ecto.Changeset
  
  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :role, :company_id, :freelance_rate])
    |> validate_required([:email, :role])
    |> validate_role_specific_fields()
    |> validate_business_rules()
  end
  
  defp validate_role_specific_fields(changeset) do
    case get_field(changeset, :role) do
      :employee -> 
        changeset
        |> validate_required([:company_id])
        |> validate_exclusion(:freelance_rate, [nil], message: "must be empty for employees")
      
      :freelancer -> 
        changeset
        |> validate_required([:freelance_rate])
        |> validate_number(:freelance_rate, greater_than: 0)
        |> validate_exclusion(:company_id, [nil], message: "must be empty for freelancers")
      
      _ -> changeset
    end
  end
  
  defp validate_business_rules(changeset) do
    changeset
    |> validate_email_domain()
    |> validate_unique_email_per_company()
  end
  
  defp validate_email_domain(changeset) do
    email = get_change(changeset, :email)
    
    if email && String.ends_with?(email, "@competitor.com") do
      add_error(changeset, :email, "competitor email addresses not allowed")
    else
      changeset
    end
  end
end
Custom Validators:
elixir
defmodule MyApp.Validators do
  import Ecto.Changeset
  
  def validate_password_strength(changeset, field) do
    validate_change changeset, field, fn _, password ->
      cond do
        String.length(password) < 8 ->
          [{field, "must be at least 8 characters"}]
        
        not Regex.match?(~r/[A-Z]/, password) ->
          [{field, "must contain uppercase letter"}]
        
        not Regex.match?(~r/[a-z]/, password) ->
          [{field, "must contain lowercase letter"}]
        
        not Regex.match?(~r/[0-9]/, password) ->
          [{field, "must contain number"}]
        
        true -> []
      end
    end
  end
  
  def validate_phone_number(changeset, field) do
    validate_change changeset, field, fn _, phone ->
      # Remove all non-digits
      digits = Regex.replace(~r/\D/, phone, "")
      
      case String.length(digits) do
        10 -> []
        11 when String.starts_with?(digits, "1") -> []
        _ -> [{field, "must be a valid phone number"}]
      end
    end
  end
  
  def validate_future_date(changeset, field) do
    validate_change changeset, field, fn _, date ->
      if Date.compare(date, Date.utc_today()) == :gt do
        []
      else
        [{field, "must be in the future"}]
      end
    end
  end
end
条件验证:
elixir
defmodule MyApp.Accounts.User do
  import Ecto.Changeset
  
  def changeset(user, attrs) do
    user
    |> cast(attrs, [:email, :role, :company_id, :freelance_rate])
    |> validate_required([:email, :role])
    |> validate_role_specific_fields()
    |> validate_business_rules()
  end
  
  defp validate_role_specific_fields(changeset) do
    case get_field(changeset, :role) do
      :employee -> 
        changeset
        |> validate_required([:company_id])
        |> validate_exclusion(:freelance_rate, [nil], message: "must be empty for employees")
      
      :freelancer -> 
        changeset
        |> validate_required([:freelance_rate])
        |> validate_number(:freelance_rate, greater_than: 0)
        |> validate_exclusion(:company_id, [nil], message: "must be empty for freelancers")
      
      _ -> changeset
    end
  end
  
  defp validate_business_rules(changeset) do
    changeset
    |> validate_email_domain()
    |> validate_unique_email_per_company()
  end
  
  defp validate_email_domain(changeset) do
    email = get_change(changeset, :email)
    
    if email && String.ends_with?(email, "@competitor.com") do
      add_error(changeset, :email, "competitor email addresses not allowed")
    else
      changeset
    end
  end
end
自定义验证器:
elixir
defmodule MyApp.Validators do
  import Ecto.Changeset
  
  def validate_password_strength(changeset, field) do
    validate_change changeset, field, fn _, password ->
      cond do
        String.length(password) < 8 ->
          [{field, "must be at least 8 characters"}]
        
        not Regex.match?(~r/[A-Z]/, password) ->
          [{field, "must contain uppercase letter"}]
        
        not Regex.match?(~r/[a-z]/, password) ->
          [{field, "must contain lowercase letter"}]
        
        not Regex.match?(~r/[0-9]/, password) ->
          [{field, "must contain number"}]
        
        true -> []
      end
    end
  end
  
  def validate_phone_number(changeset, field) do
    validate_change changeset, field, fn _, phone ->
      # 移除所有非数字字符
      digits = Regex.replace(~r/\D/, phone, "")
      
      case String.length(digits) do
        10 -> []
        11 when String.starts_with?(digits, "1") -> []
        _ -> [{field, "must be a valid phone number"}]
      end
    end
  end
  
  def validate_future_date(changeset, field) do
    validate_change changeset, field, fn _, date ->
      if Date.compare(date, Date.utc_today()) == :gt do
        []
      else
        [{field, "must be in the future"}]
      end
    end
  end
end

Nested Changesets

嵌套变更集

Complex Form Handling:
elixir
defmodule MyApp.Orders.Order do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "orders" do
    field :total_amount, :decimal
    field :status, :string, default: "pending"
    
    belongs_to :customer, MyApp.Accounts.User
    has_many :line_items, MyApp.Orders.LineItem, on_delete: :delete_all
    
    timestamps()
  end
  
  def changeset(order, attrs) do
    order
    |> cast(attrs, [:customer_id])
    |> cast_assoc(:line_items, required: true)
    |> validate_required([:customer_id])
    |> calculate_total()
    |> validate_number(:total_amount, greater_than: 0)
  end
  
  defp calculate_total(changeset) do
    case get_change(changeset, :line_items) do
      nil -> changeset
      line_items_changesets ->
        total = 
          line_items_changesets
          |> Enum.map(&get_field(&1, :amount))
          |> Enum.sum()
        
        put_change(changeset, :total_amount, total)
    end
  end
end

defmodule MyApp.Orders.LineItem do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "line_items" do
    field :quantity, :integer
    field :unit_price, :decimal
    field :amount, :decimal
    
    belongs_to :order, MyApp.Orders.Order
    belongs_to :product, MyApp.Catalog.Product
    
    timestamps()
  end
  
  def changeset(line_item, attrs) do
    line_item
    |> cast(attrs, [:product_id, :quantity, :unit_price])
    |> validate_required([:product_id, :quantity, :unit_price])
    |> validate_number(:quantity, greater_than: 0)
    |> validate_number(:unit_price, greater_than: 0)
    |> calculate_amount()
  end
  
  defp calculate_amount(changeset) do
    case {get_field(changeset, :quantity), get_field(changeset, :unit_price)} do
      {qty, price} when is_integer(qty) and not is_nil(price) ->
        amount = Decimal.mult(price, qty)
        put_change(changeset, :amount, amount)
      
      _ -> changeset
    end
  end
end
复杂表单处理:
elixir
defmodule MyApp.Orders.Order do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "orders" do
    field :total_amount, :decimal
    field :status, :string, default: "pending"
    
    belongs_to :customer, MyApp.Accounts.User
    has_many :line_items, MyApp.Orders.LineItem, on_delete: :delete_all
    
    timestamps()
  end
  
  def changeset(order, attrs) do
    order
    |> cast(attrs, [:customer_id])
    |> cast_assoc(:line_items, required: true)
    |> validate_required([:customer_id])
    |> calculate_total()
    |> validate_number(:total_amount, greater_than: 0)
  end
  
  defp calculate_total(changeset) do
    case get_change(changeset, :line_items) do
      nil -> changeset
      line_items_changesets ->
        total = 
          line_items_changesets
          |> Enum.map(&get_field(&1, :amount))
          |> Enum.sum()
        
        put_change(changeset, :total_amount, total)
    end
  end
end

defmodule MyApp.Orders.LineItem do
  use Ecto.Schema
  import Ecto.Changeset
  
  schema "line_items" do
    field :quantity, :integer
    field :unit_price, :decimal
    field :amount, :decimal
    
    belongs_to :order, MyApp.Orders.Order
    belongs_to :product, MyApp.Catalog.Product
    
    timestamps()
  end
  
  def changeset(line_item, attrs) do
    line_item
    |> cast(attrs, [:product_id, :quantity, :unit_price])
    |> validate_required([:product_id, :quantity, :unit_price])
    |> validate_number(:quantity, greater_than: 0)
    |> validate_number(:unit_price, greater_than: 0)
    |> calculate_amount()
  end
  
  defp calculate_amount(changeset) do
    case {get_field(changeset, :quantity), get_field(changeset, :unit_price)} do
      {qty, price} when is_integer(qty) and not is_nil(price) ->
        amount = Decimal.mult(price, qty)
        put_change(changeset, :amount, amount)
      
      _ -> changeset
    end
  end
end

Querying Patterns

查询模式

Basic Query Composition

基础查询组合

Query Building:
elixir
defmodule MyApp.Blog do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Blog.Post
  
  def list_posts(opts \\ []) do
    Post
    |> filter_by_status(opts[:status])
    |> filter_by_author(opts[:author_id])
    |> search_by_title(opts[:search])
    |> order_by_date(opts[:order])
    |> paginate(opts[:page], opts[:per_page])
    |> Repo.all()
    |> preload_associations()
  end
  
  defp filter_by_status(query, nil), do: query
  defp filter_by_status(query, status) do
    from p in query, where: p.status == ^status
  end
  
  defp filter_by_author(query, nil), do: query
  defp filter_by_author(query, author_id) do
    from p in query, where: p.author_id == ^author_id
  end
  
  defp search_by_title(query, nil), do: query
  defp search_by_title(query, search_term) do
    like_pattern = "%#{search_term}%"
    from p in query, where: ilike(p.title, ^like_pattern)
  end
  
  defp order_by_date(query, :asc) do
    from p in query, order_by: [asc: p.inserted_at]
  end
  defp order_by_date(query, _) do
    from p in query, order_by: [desc: p.inserted_at]
  end
  
  defp paginate(query, nil, _), do: query
  defp paginate(query, page, per_page) do
    offset = (page - 1) * per_page
    from p in query, limit: ^per_page, offset: ^offset
  end
  
  defp preload_associations(posts) do
    Repo.preload(posts, [:author, :tags, comments: :user])
  end
end
查询构建:
elixir
defmodule MyApp.Blog do
  import Ecto.Query
  alias MyApp.Repo
  alias MyApp.Blog.Post
  
  def list_posts(opts \\ []) do
    Post
    |> filter_by_status(opts[:status])
    |> filter_by_author(opts[:author_id])
    |> search_by_title(opts[:search])
    |> order_by_date(opts[:order])
    |> paginate(opts[:page], opts[:per_page])
    |> Repo.all()
    |> preload_associations()
  end
  
  defp filter_by_status(query, nil), do: query
  defp filter_by_status(query, status) do
    from p in query, where: p.status == ^status
  end
  
  defp filter_by_author(query, nil), do: query
  defp filter_by_author(query, author_id) do
    from p in query, where: p.author_id == ^author_id
  end
  
  defp search_by_title(query, nil), do: query
  defp search_by_title(query, search_term) do
    like_pattern = "%#{search_term}%"
    from p in query, where: ilike(p.title, ^like_pattern)
  end
  
  defp order_by_date(query, :asc) do
    from p in query, order_by: [asc: p.inserted_at]
  end
  defp order_by_date(query, _) do
    from p in query, order_by: [desc: p.inserted_at]
  end
  
  defp paginate(query, nil, _), do: query
  defp paginate(query, page, per_page) do
    offset = (page - 1) * per_page
    from p in query, limit: ^per_page, offset: ^offset
  end
  
  defp preload_associations(posts) do
    Repo.preload(posts, [:author, :tags, comments: :user])
  end
end

Advanced Query Patterns

高级查询模式

Subqueries and Aggregations:
elixir
defmodule MyApp.Analytics do
  import Ecto.Query
  alias MyApp.Repo
  
  def popular_posts_with_stats do
    comment_counts = from c in Comment,
                     group_by: c.post_id,
                     select: %{post_id: c.post_id, count: count(c.id)}
    
    from p in Post,
      left_join: cc in subquery(comment_counts), on: p.id == cc.post_id,
      select: %{
        id: p.id,
        title: p.title,
        comment_count: coalesce(cc.count, 0),
        view_count: p.view_count
      },
      where: p.status == "published",
      order_by: [desc: coalesce(cc.count, 0)]
  end
  
  def user_activity_summary(user_id) do
    posts_query = from p in Post,
                   where: p.author_id == ^user_id,
                   select: count(p.id)
    
    comments_query = from c in Comment,
                      where: c.user_id == ^user_id,
                      select: count(c.id)
    
    %{
      posts_count: Repo.one(posts_query),
      comments_count: Repo.one(comments_query),
      last_activity: last_activity(user_id)
    }
  end
  
  defp last_activity(user_id) do
    last_post = from p in Post,
                where: p.author_id == ^user_id,
                select: p.inserted_at,
                order_by: [desc: p.inserted_at],
                limit: 1
    
    last_comment = from c in Comment,
                   where: c.user_id == ^user_id,
                   select: c.inserted_at,
                   order_by: [desc: c.inserted_at],
                   limit: 1
    
    [Repo.one(last_post), Repo.one(last_comment)]
    |> Enum.reject(&is_nil/1)
    |> Enum.max(Date, fn -> nil end)
  end
end
子查询与聚合:
elixir
defmodule MyApp.Analytics do
  import Ecto.Query
  alias MyApp.Repo
  
  def popular_posts_with_stats do
    comment_counts = from c in Comment,
                     group_by: c.post_id,
                     select: %{post_id: c.post_id, count: count(c.id)}
    
    from p in Post,
      left_join: cc in subquery(comment_counts), on: p.id == cc.post_id,
      select: %{
        id: p.id,
        title: p.title,
        comment_count: coalesce(cc.count, 0),
        view_count: p.view_count
      },
      where: p.status == "published",
      order_by: [desc: coalesce(cc.count, 0)]
  end
  
  def user_activity_summary(user_id) do
    posts_query = from p in Post,
                   where: p.author_id == ^user_id,
                   select: count(p.id)
    
    comments_query = from c in Comment,
                      where: c.user_id == ^user_id,
                      select: count(c.id)
    
    %{
      posts_count: Repo.one(posts_query),
      comments_count: Repo.one(comments_query),
      last_activity: last_activity(user_id)
    }
  end
  
  defp last_activity(user_id) do
    last_post = from p in Post,
                where: p.author_id == ^user_id,
                select: p.inserted_at,
                order_by: [desc: p.inserted_at],
                limit: 1
    
    last_comment = from c in Comment,
                   where: c.user_id == ^user_id,
                   select: c.inserted_at,
                   order_by: [desc: c.inserted_at],
                   limit: 1
    
    [Repo.one(last_post), Repo.one(last_comment)]
    |> Enum.reject(&is_nil/1)
    |> Enum.max(Date, fn -> nil end)
  end
end

Preloading Strategies

预加载策略

Efficient Data Loading:
elixir
defmodule MyApp.Blog do
  # Basic preload
  def get_post_with_author(id) do
    Post
    |> Repo.get!(id)
    |> Repo.preload(:author)
  end
  
  # Selective preload based on query
  def get_post_with_comments(id, include_author: include_author) do
    preloads = if include_author do
      [author: [], comments: :user]
    else
      [comments: :user]
    end
    
    Post
    |> Repo.get!(id)
    |> Repo.preload(preloads)
  end
  
  # Custom preload query
  def get_post_with_recent_comments(id) do
    recent_comments = from c in Comment,
                      where: c.inserted_at > ago(7, "day"),
                      order_by: [desc: c.inserted_at],
                      preload: :user
    
    Post
    |> Repo.get!(id)
    |> Repo.preload([author: [], comments: recent_comments])
  end
  
  # Prevent N+1 with join preload for aggregation
  def list_posts_with_comment_counts do
    from p in Post,
      left_join: c in assoc(p, :comments),
      group_by: p.id,
      preload: [author: []],
      select: %{post: p, comment_count: count(c.id)}
  end
end
高效数据加载:
elixir
defmodule MyApp.Blog do
  # 基础预加载
  def get_post_with_author(id) do
    Post
    |> Repo.get!(id)
    |> Repo.preload(:author)
  end
  
  # 根据查询选择性预加载
  def get_post_with_comments(id, include_author: include_author) do
    preloads = if include_author do
      [author: [], comments: :user]
    else
      [comments: :user]
    end
    
    Post
    |> Repo.get!(id)
    |> Repo.preload(preloads)
  end
  
  # 自定义预加载查询
  def get_post_with_recent_comments(id) do
    recent_comments = from c in Comment,
                      where: c.inserted_at > ago(7, "day"),
                      order_by: [desc: c.inserted_at],
                      preload: :user
    
    Post
    |> Repo.get!(id)
    |> Repo.preload([author: [], comments: recent_comments])
  end
  
  # 使用连接预加载避免N+1查询并聚合
  def list_posts_with_comment_counts do
    from p in Post,
      left_join: c in assoc(p, :comments),
      group_by: p.id,
      preload: [author: []],
      select: %{post: p, comment_count: count(c.id)}
  end
end

Dynamic Queries

动态查询

Runtime Query Building:
elixir
defmodule MyApp.Search do
  import Ecto.Query
  
  def build_search_query(base_query, filters) do
    Enum.reduce(filters, base_query, &apply_filter/2)
  end
  
  defp apply_filter({:status, status}, query) when status != "" do
    from q in query, where: q.status == ^status
  end
  
  defp apply_filter({:date_range, %{start: start_date, end: end_date}}, query) do
    from q in query,
      where: q.inserted_at >= ^start_date and q.inserted_at <= ^end_date
  end
  
  defp apply_filter({:tags, tag_ids}, query) when is_list(tag_ids) and tag_ids != [] do
    from q in query,
      join: t in assoc(q, :tags),
      where: t.id in ^tag_ids,
      distinct: true
  end
  
  defp apply_filter({:search, term}, query) when is_binary(term) and term != "" do
    like_term = "%#{term}%"
    from q in query,
      where: ilike(q.title, ^like_term) or ilike(q.body, ^like_term)
  end
  
  defp apply_filter({:author_name, name}, query) when is_binary(name) and name != "" do
    from q in query,
      join: a in assoc(q, :author),
      where: ilike(a.name, ^"%#{name}%")
  end
  
  # Ignore unknown or empty filters
  defp apply_filter(_, query), do: query
end
运行时查询构建:
elixir
defmodule MyApp.Search do
  import Ecto.Query
  
  def build_search_query(base_query, filters) do
    Enum.reduce(filters, base_query, &apply_filter/2)
  end
  
  defp apply_filter({:status, status}, query) when status != "" do
    from q in query, where: q.status == ^status
  end
  
  defp apply_filter({:date_range, %{start: start_date, end: end_date}}, query) do
    from q in query,
      where: q.inserted_at >= ^start_date and q.inserted_at <= ^end_date
  end
  
  defp apply_filter({:tags, tag_ids}, query) when is_list(tag_ids) and tag_ids != [] do
    from q in query,
      join: t in assoc(q, :tags),
      where: t.id in ^tag_ids,
      distinct: true
  end
  
  defp apply_filter({:search, term}, query) when is_binary(term) and term != "" do
    like_term = "%#{term}%"
    from q in query,
      where: ilike(q.title, ^like_term) or ilike(q.body, ^like_term)
  end
  
  defp apply_filter({:author_name, name}, query) when is_binary(name) and name != "" do
    from q in query,
      join: a in assoc(q, :author),
      where: ilike(a.name, ^"%#{name}%")
  end
  
  # 忽略未知或空过滤器
  defp apply_filter(_, query), do: query
end

Migrations & Schema Evolution

迁移与Schema演进

Migration Patterns

迁移模式

Safe Migration Strategies:
elixir
defmodule MyApp.Repo.Migrations.CreateUsersTable do
  use Ecto.Migration
  
  def change do
    create table(:users, primary_key: false) do
      add :id, :binary_id, primary_key: true
      add :email, :string, null: false
      add :name, :string, null: false
      add :age, :integer
      add :is_active, :boolean, default: true, null: false
      
      timestamps()
    end
    
    create unique_index(:users, [:email])
    create index(:users, [:is_active])
    
    # Add constraint for data integrity
    create constraint(:users, :age_must_be_positive, check: "age > 0")
  end
end
安全迁移策略:
elixir
defmodule MyApp.Repo.Migrations.CreateUsersTable do
  use Ecto.Migration
  
  def change do
    create table(:users, primary_key: false) do
      add :id, :binary_id, primary_key: true
      add :email, :string, null: false
      add :name, :string, null: false
      add :age, :integer
      add :is_active, :boolean, default: true, null: false
      
      timestamps()
    end
    
    create unique_index(:users, [:email])
    create index(:users, [:is_active])
    
    # 添加数据完整性约束
    create constraint(:users, :age_must_be_positive, check: "age > 0")
  end
end

Adding columns safely

安全添加列

defmodule MyApp.Repo.Migrations.AddUserProfile do use Ecto.Migration
def change do alter table(:users) do add :profile_data, :map, default: "{}" add :last_login_at, :utc_datetime add :login_count, :integer, default: 0 end
create index(:users, [:last_login_at])
end end
defmodule MyApp.Repo.Migrations.AddUserProfile do use Ecto.Migration
def change do alter table(:users) do add :profile_data, :map, default: "{}" add :last_login_at, :utc_datetime add :login_count, :integer, default: 0 end
create index(:users, [:last_login_at])
end end

Data migration

数据迁移

defmodule MyApp.Repo.Migrations.MigrateUserNames do use Ecto.Migration import Ecto.Query
def up do # First add new columns alter table(:users) do add :first_name, :string add :last_name, :string end
flush()

# Migrate data
from(u in "users", select: [:id, :name])
|> MyApp.Repo.all()
|> Enum.each(fn user ->
  names = String.split(user.name, " ", parts: 2)
  first_name = Enum.at(names, 0)
  last_name = Enum.at(names, 1, "")
  
  from(u in "users", where: u.id == ^user.id)
  |> MyApp.Repo.update_all(set: [first_name: first_name, last_name: last_name])
end)
end
def down do alter table(:users) do remove :first_name remove :last_name end end end
undefined
defmodule MyApp.Repo.Migrations.MigrateUserNames do use Ecto.Migration import Ecto.Query
def up do # 先添加新列 alter table(:users) do add :first_name, :string add :last_name, :string end
flush()

# 迁移数据
from(u in "users", select: [:id, :name])
|> MyApp.Repo.all()
|> Enum.each(fn user ->
  names = String.split(user.name, " ", parts: 2)
  first_name = Enum.at(names, 0)
  last_name = Enum.at(names, 1, "")
  
  from(u in "users", where: u.id == ^user.id)
  |> MyApp.Repo.update_all(set: [first_name: first_name, last_name: last_name])
end)
end
def down do alter table(:users) do remove :first_name remove :last_name end end end
undefined

Zero-Downtime Migrations

零停机迁移

Safe Schema Changes:
elixir
undefined
安全Schema变更:
elixir
undefined

Step 1: Add new nullable column

步骤1:添加新的可空列

defmodule MyApp.Repo.Migrations.AddNewEmailColumn do use Ecto.Migration
def change do alter table(:users) do add :new_email, :string end
create index(:users, [:new_email])
end end
defmodule MyApp.Repo.Migrations.AddNewEmailColumn do use Ecto.Migration
def change do alter table(:users) do add :new_email, :string end
create index(:users, [:new_email])
end end

Step 2: Deploy code that writes to both columns

步骤2:部署同时写入新旧列的代码

Step 3: Backfill data

步骤3:回填数据

defmodule MyApp.Repo.Migrations.BackfillNewEmail do use Ecto.Migration import Ecto.Query
def up do from(u in "users", where: is_nil(u.new_email)) |> MyApp.Repo.update_all(set: [new_email: fragment("email")]) end
def down, do: :ok end
defmodule MyApp.Repo.Migrations.BackfillNewEmail do use Ecto.Migration import Ecto.Query
def up do from(u in "users", where: is_nil(u.new_email)) |> MyApp.Repo.update_all(set: [new_email: fragment("email")]) end
def down, do: :ok end

Step 4: Add not null constraint

步骤4:添加非空约束

defmodule MyApp.Repo.Migrations.MakeNewEmailRequired do use Ecto.Migration
def change do alter table(:users) do modify :new_email, :string, null: false end
create unique_index(:users, [:new_email])
end end
defmodule MyApp.Repo.Migrations.MakeNewEmailRequired do use Ecto.Migration
def change do alter table(:users) do modify :new_email, :string, null: false end
create unique_index(:users, [:new_email])
end end

Step 5: Deploy code that only uses new column

步骤5:部署仅使用新列的代码

Step 6: Remove old column

步骤6:删除旧列

defmodule MyApp.Repo.Migrations.RemoveOldEmailColumn do use Ecto.Migration
def change do drop index(:users, [:email])
alter table(:users) do
  remove :email
end

rename table(:users), :new_email, to: :email
end end
undefined
defmodule MyApp.Repo.Migrations.RemoveOldEmailColumn do use Ecto.Migration
def change do drop index(:users, [:email])
alter table(:users) do
  remove :email
end

rename table(:users), :new_email, to: :email
end end
undefined

Performance & Optimization

性能与优化

Query Optimization

查询优化

N+1 Query Prevention:
elixir
undefined
避免N+1查询:
elixir
undefined

Bad: N+1 queries

错误:N+1查询

def list_posts_bad do posts = Repo.all(Post)
Enum.map(posts, fn post -> # This generates one query per post! author = Repo.get(User, post.author_id) %{post | author: author} end) end
def list_posts_bad do posts = Repo.all(Post)
Enum.map(posts, fn post -> # 这会为每篇文章生成一个查询! author = Repo.get(User, post.author_id) %{post | author: author} end) end

Good: Use preload

正确:使用预加载

def list_posts_good do Post |> Repo.all() |> Repo.preload(:author) end
def list_posts_good do Post |> Repo.all() |> Repo.preload(:author) end

Good: Use join when you don't need associated data

正确:当不需要完整关联数据时使用连接

def list_post_summaries do from p in Post, join: u in User, on: p.author_id == u.id, select: %{id: p.id, title: p.title, author_name: u.name} end

**Batch Loading:**
```elixir
defmodule MyApp.Accounts do
  def get_users_with_post_counts(user_ids) do
    # Get users
    users = from(u in User, where: u.id in ^user_ids) |> Repo.all()
    
    # Get post counts in single query
    post_counts = 
      from(p in Post,
        where: p.author_id in ^user_ids,
        group_by: p.author_id,
        select: {p.author_id, count(p.id)})
      |> Repo.all()
      |> Map.new()
    
    # Combine results
    Enum.map(users, fn user ->
      post_count = Map.get(post_counts, user.id, 0)
      %{user | post_count: post_count}
    end)
  end
end
Database Indexes:
elixir
defmodule MyApp.Repo.Migrations.AddPerformanceIndexes do
  use Ecto.Migration
  
  def change do
    # Composite index for common query pattern
    create index(:posts, [:author_id, :status, :inserted_at])
    
    # Partial index for active records only
    create index(:users, [:email], where: "is_active = true")
    
    # Functional index for case-insensitive searches
    create index(:users, ["lower(email)"])
    
    # GIN index for JSON column searches
    create index(:posts, [:metadata], using: "GIN")
  end
end
def list_post_summaries do from p in Post, join: u in User, on: p.author_id == u.id, select: %{id: p.id, title: p.title, author_name: u.name} end

**批量加载:**
```elixir
defmodule MyApp.Accounts do
  def get_users_with_post_counts(user_ids) do
    # 获取用户
    users = from(u in User, where: u.id in ^user_ids) |> Repo.all()
    
    # 单次查询获取文章数量
    post_counts = 
      from(p in Post,
        where: p.author_id in ^user_ids,
        group_by: p.author_id,
        select: {p.author_id, count(p.id)})
      |> Repo.all()
      |> Map.new()
    
    # 合并结果
    Enum.map(users, fn user ->
      post_count = Map.get(post_counts, user.id, 0)
      %{user | post_count: post_count}
    end)
  end
end
数据库索引:
elixir
defmodule MyApp.Repo.Migrations.AddPerformanceIndexes do
  use Ecto.Migration
  
  def change do
    # 针对常见查询模式的复合索引
    create index(:posts, [:author_id, :status, :inserted_at])
    
    # 仅针对活跃记录的部分索引
    create index(:users, [:email], where: "is_active = true")
    
    # 用于不区分大小写搜索的函数索引
    create index(:users, ["lower(email)"])
    
    # 用于JSON列搜索的GIN索引
    create index(:posts, [:metadata], using: "GIN")
  end
end

Connection Pool Management

连接池管理

Pool Configuration:
elixir
undefined
池配置:
elixir
undefined

config/config.exs

config/config.exs

config :my_app, MyApp.Repo, pool_size: String.to_integer(System.get_env("DB_POOL_SIZE") || "10"), queue_target: 5000, queue_interval: 5000, timeout: 15_000, pool_timeout: 5_000
config :my_app, MyApp.Repo, pool_size: String.to_integer(System.get_env("DB_POOL_SIZE") || "10"), queue_target: 5000, queue_interval: 5000, timeout: 15_000, pool_timeout: 5_000

For read replicas

针对只读副本

config :my_app, MyApp.ReadRepo, pool_size: 5, priv: "priv/repo" # Share migration directory

**Transaction Management:**
```elixir
defmodule MyApp.Orders do
  alias MyApp.Repo
  alias Ecto.Multi
  
  def create_order_with_payment(order_attrs, payment_attrs) do
    Multi.new()
    |> Multi.insert(:order, Order.changeset(%Order{}, order_attrs))
    |> Multi.run(:payment, fn repo, %{order: order} ->
      payment_attrs = Map.put(payment_attrs, :order_id, order.id)
      Payment.changeset(%Payment{}, payment_attrs)
      |> repo.insert()
    end)
    |> Multi.run(:inventory, fn _repo, %{order: order} ->
      # Complex inventory update logic
      MyApp.Inventory.reserve_items(order.line_items)
    end)
    |> Repo.transaction()
    |> case do
      {:ok, %{order: order, payment: payment}} -> {:ok, order}
      {:error, :order, changeset, _} -> {:error, changeset}
      {:error, :payment, changeset, _} -> {:error, changeset}
      {:error, :inventory, reason, _} -> {:error, reason}
    end
  end
end
config :my_app, MyApp.ReadRepo, pool_size: 5, priv: "priv/repo" # 共享迁移目录

**事务管理:**
```elixir
defmodule MyApp.Orders do
  alias MyApp.Repo
  alias Ecto.Multi
  
  def create_order_with_payment(order_attrs, payment_attrs) do
    Multi.new()
    |> Multi.insert(:order, Order.changeset(%Order{}, order_attrs))
    |> Multi.run(:payment, fn repo, %{order: order} ->
      payment_attrs = Map.put(payment_attrs, :order_id, order.id)
      Payment.changeset(%Payment{}, payment_attrs)
      |> repo.insert()
    end)
    |> Multi.run(:inventory, fn _repo, %{order: order} ->
      # 复杂库存更新逻辑
      MyApp.Inventory.reserve_items(order.line_items)
    end)
    |> Repo.transaction()
    |> case do
      {:ok, %{order: order, payment: payment}} -> {:ok, order}
      {:error, :order, changeset, _} -> {:error, changeset}
      {:error, :payment, changeset, _} -> {:error, changeset}
      {:error, :inventory, reason, _} -> {:error, reason}
    end
  end
end

Cross-References

交叉引用

Phoenix Integration

Phoenix集成

For Phoenix-specific Ecto patterns, schema generation with generators, and web-layer database integration, see the
elixir-phoenix-framework
skill.
针对Phoenix特有的Ecto模式、使用生成器生成Schema以及Web层数据库集成,请参考
elixir-phoenix-framework
技能。

System Architecture

系统架构

For designing data layer architecture, context boundaries with Ecto schemas, and database scaling patterns, see the
elixir-architecture
skill.
针对数据层架构设计、Ecto Schema的上下文边界以及数据库扩展模式,请参考
elixir-architecture
技能。

Performance Review

性能评审

For database query optimization, N+1 detection, and Ecto performance monitoring, see the
elixir-review
skill.
针对数据库查询优化、N+1查询检测以及Ecto性能监控,请参考
elixir-review
技能。

Best Practices Summary

最佳实践总结

Schema Design

Schema设计

  1. Use UUIDs for distributed systems - avoid integer ID conflicts
  2. Validate at the database level - use constraints and indexes
  3. Keep schemas focused - single responsibility principle
  4. Use virtual fields judiciously - for computed values and form handling
  1. 分布式系统使用UUID - 避免整数ID冲突
  2. 在数据库层面验证 - 使用约束和索引
  3. 保持Schema聚焦 - 单一职责原则
  4. 谨慎使用虚拟字段 - 用于计算值和表单处理

Changeset Patterns

变更集模式

  1. Validate early and often - fail fast with clear messages
  2. Separate update types - different changesets for different operations
  3. Use custom validators - for complex business rules
  4. Handle associations properly - use cast_assoc and cast_embed
  1. 尽早并频繁验证 - 快速失败并给出清晰提示
  2. 区分更新类型 - 不同操作使用不同变更集
  3. 使用自定义验证器 - 处理复杂业务规则
  4. 正确处理关联关系 - 使用cast_assoc和cast_embed

Query Optimization

查询优化

  1. Prevent N+1 queries - use preload and joins appropriately
  2. Use indexes strategically - for common query patterns
  3. Batch operations - reduce round trips to database
  4. Profile queries - measure before optimizing
  1. 避免N+1查询 - 合理使用预加载和连接
  2. 策略性使用索引 - 针对常见查询模式
  3. 批量操作 - 减少与数据库的往返次数
  4. 分析查询性能 - 优化前先测量

Migration Safety

迁移安全

  1. Add before remove - maintain compatibility during deployment
  2. Use constraints - enforce data integrity
  3. Test rollbacks - ensure migrations are reversible
  4. Batch large changes - avoid blocking operations
This skill provides comprehensive guidance for building efficient, maintainable database layers with Ecto in Elixir applications.
  1. 先添加再移除 - 部署期间保持兼容性
  2. 使用约束 - 强制数据完整性
  3. 测试回滚 - 确保迁移可回退
  4. 分批处理大型变更 - 避免阻塞操作
本技能指南为在Elixir应用中使用Ecto构建高效、可维护的数据库层提供了全面指导。