migrating-airflow-2-to-3
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseAirflow 2 to 3 Migration
Airflow 2 迁移至 3 指南
This skill helps migrate Airflow 2.x DAG code to Airflow 3.x, focusing on code changes (imports, operators, hooks, context, API usage).
Important: Before migrating to Airflow 3, strongly recommend upgrading to Airflow 2.11 first, then to at least Airflow 3.0.11 (ideally directly to 3.1). Other upgrade paths would make rollbacks impossible. See: https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3. Additionally, early 3.0 versions have many bugs - 3.1 provides a much better experience.
本技能可帮助将Airflow 2.x DAG代码迁移至Airflow 3.x,重点关注代码变更(导入语句、Operator、Hook、上下文、API使用)。
重要提示:在迁移至Airflow 3之前,强烈建议先升级到Airflow 2.11,然后再升级到至少Airflow 3.0.11(理想情况下直接升级到3.1)。其他升级路径会导致无法回滚。参考:https://www.astronomer.io/docs/astro/airflow3/upgrade-af3#upgrade-your-airflow-2-deployment-to-airflow-3。此外,早期的3.0版本存在诸多bug,3.1版本的体验要好得多。
Migration at a Glance
迁移概览
- Run Ruff's Airflow migration rules to auto-fix detectable issues (AIR30/AIR301/AIR302/AIR31/AIR311/AIR312).
ruff check --preview --select AIR --fix --unsafe-fixes .
- Scan for remaining issues using the manual search checklist in reference/migration-checklist.md.
- Focus on: direct metadata DB access, legacy imports, scheduling/context keys, XCom pickling, datasets-to-assets, REST API/auth, plugins, and file paths.
- Hard behavior/config gotchas to explicitly review:
- Cron scheduling semantics: consider if you need Airflow 2-style cron data intervals.
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True - syntax changed from regexp to glob; set
.airflowignoreif you must keep regexp behavior.AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp - OAuth callback URLs add an prefix (e.g.
/auth/)./auth/oauth-authorized/google
- Cron scheduling semantics: consider
- Plan changes per file and issue type:
- Fix imports - update operators/hooks/providers - refactor metadata access to using the Airflow client instead of direct access - fix use of outdated context variables - fix scheduling logic.
- Implement changes incrementally, re-running Ruff and code searches after each major change.
- Explain changes to the user and caution them to test any updated logic such as refactored metadata, scheduling logic and use of the Airflow context.
- 运行Ruff的Airflow迁移规则,自动修复可检测的问题(AIR30/AIR301/AIR302/AIR31/AIR311/AIR312)。
ruff check --preview --select AIR --fix --unsafe-fixes .
- 使用reference/migration-checklist.md中的手动检查清单扫描剩余问题。
- 重点关注:直接元数据库访问、旧版导入语句、调度/上下文键、XCom序列化、数据集转资产、REST API/认证、插件和文件路径。
- 需要特别注意的行为/配置陷阱:
- Cron调度语义:如果需要Airflow 2风格的Cron数据间隔,请考虑设置。
AIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True - 语法从正则表达式改为glob模式;如果必须保留正则表达式行为,请设置
.airflowignore。AIRFLOW__CORE__DAG_IGNORE_FILE_SYNTAX=regexp - OAuth回调URL新增前缀(例如:
/auth/)。/auth/oauth-authorized/google
- Cron调度语义:如果需要Airflow 2风格的Cron数据间隔,请考虑设置
- 按文件和问题类型规划变更:
- 修复导入语句 - 更新Operator/Hook/提供者 - 将元数据访问重构为使用Airflow客户端而非直接访问 - 修复过时上下文变量的使用 - 修复调度逻辑。
- 逐步实施变更,每次重大变更后重新运行Ruff和代码搜索。
- 向用户解释变更内容,并提醒他们测试任何更新后的逻辑,例如重构后的元数据、调度逻辑以及Airflow上下文的使用。
Architecture & Metadata DB Access
架构与元数据库访问
Airflow 3 changes how components talk to the metadata database:
- Workers no longer connect directly to the metadata DB.
- Task code runs via the Task Execution API exposed by the API server.
- The DAG processor runs as an independent process separate from the scheduler.
- The Triggerer uses the task execution mechanism via an in-process API server.
Trigger implementation gotcha: If a trigger calls hooks synchronously inside the asyncio event loop, it may fail or block. Prefer calling hooks via (or otherwise ensure hook calls are async-safe).
sync_to_async(...)Key code impact: Task code can still import ORM sessions/models, but any attempt to use them to talk to the metadata DB will fail with:
text
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.xAirflow 3变更了组件与元数据库的交互方式:
- Worker不再直接连接到元数据库。
- 任务代码通过API服务器暴露的任务执行API运行。
- DAG处理器作为独立进程运行,与调度器分离。
- Triggerer通过进程内API服务器使用任务执行机制。
Trigger实现陷阱:如果Trigger在asyncio事件循环内同步调用Hook,可能会失败或阻塞。建议通过调用Hook(或确保Hook调用是异步安全的)。
sync_to_async(...)关键代码影响:任务代码仍可导入ORM会话/模型,但任何尝试使用它们访问元数据库的操作都会失败,并抛出:
text
RuntimeError: Direct database access via the ORM is not allowed in Airflow 3.xPatterns to search for
需要搜索的模式
When scanning DAGs, custom operators, and functions, look for:
@task- Session helpers: ,
provide_session,create_session@provide_session - Sessions from settings:
from airflow.settings import Session - Engine access:
from airflow.settings import engine - ORM usage with models: ,
session.query(DagModel)...session.query(DagRun)...
扫描DAG、自定义Operator和函数时,需查找:
@task- 会话助手:,
provide_session,create_session@provide_session - 来自settings的会话:
from airflow.settings import Session - 引擎访问:
from airflow.settings import engine - 使用模型的ORM操作:,
session.query(DagModel)...session.query(DagRun)...
Replacement: Airflow Python client
替代方案:Airflow Python客户端
Preferred for rich metadata access patterns. Add to :
requirements.txttext
apache-airflow-client==<your-airflow-runtime-version>Example usage:
python
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))适用于复杂的元数据访问场景。添加至:
requirements.txttext
apache-airflow-client==<your-airflow-runtime-version>使用示例:
python
import os
from airflow.sdk import BaseOperator
import airflow_client.client
from airflow_client.client.api.dag_api import DAGApi
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
class ListDagsOperator(BaseOperator):
def execute(self, context):
config = airflow_client.client.Configuration(host=_HOST, access_token=_TOKEN)
with airflow_client.client.ApiClient(config) as api_client:
dag_api = DAGApi(api_client)
dags = dag_api.get_dags(limit=10)
self.log.info("Found %d DAGs", len(dags.dags))Replacement: Direct REST API calls
替代方案:直接调用REST API
For simple cases, call the REST API directly using :
requestspython
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())对于简单场景,可使用直接调用REST API:
requestspython
from airflow.sdk import task
import os
import requests
_HOST = os.getenv("AIRFLOW__API__BASE_URL", "https://<your-org>.astronomer.run/<deployment>/")
_TOKEN = os.getenv("DEPLOYMENT_API_TOKEN")
@task
def list_dags_via_api() -> None:
response = requests.get(
f"{_HOST}/api/v2/dags",
headers={"Accept": "application/json", "Authorization": f"Bearer {_TOKEN}"},
params={"limit": 10}
)
response.raise_for_status()
print(response.json())Ruff Airflow Migration Rules
Ruff Airflow迁移规则
Use Ruff's Airflow rules to detect and fix many breaking changes automatically.
- AIR30 / AIR301 / AIR302: Removed code and imports in Airflow 3 - must be fixed.
- AIR31 / AIR311 / AIR312: Deprecated code and imports - still work but will be removed in future versions; should be fixed.
Commands to run (via ) against the project root:
uvbash
undefined使用Ruff的Airflow规则可自动检测并修复许多破坏性变更。
- AIR30 / AIR301 / AIR302:Airflow 3中移除的代码和导入语句 - 必须修复。
- AIR31 / AIR311 / AIR312:已弃用的代码和导入语句 - 仍可工作但未来版本会移除;建议修复。
在项目根目录下通过运行以下命令:
uvbash
undefinedAuto-fix all detectable Airflow issues (safe + unsafe)
自动修复所有可检测的Airflow问题(安全+不安全修复)
ruff check --preview --select AIR --fix --unsafe-fixes .
ruff check --preview --select AIR --fix --unsafe-fixes .
Check remaining Airflow issues without fixing
检查剩余Airflow问题但不修复
ruff check --preview --select AIR .
---ruff check --preview --select AIR .
---Reference Files
参考文件
For detailed code examples and migration patterns, see:
-
reference/migration-patterns.md - Detailed code examples for:
- Removed modules and import reorganizations
- Task SDK and Param usage
- SubDAGs, SLAs, and removed features
- Scheduling and context changes
- XCom pickling removal
- Datasets to Assets migration
- DAG bundles and file paths
-
reference/migration-checklist.md - Manual search checklist with:
- Search patterns for each issue type
- Recommended fixes
- FAB plugin warnings
- Callback and behavior changes
如需详细代码示例和迁移模式,请查看:
-
reference/migration-patterns.md - 详细代码示例,包括:
- 移除的模块和导入重组
- Task SDK和Param使用
- SubDAG、SLA和移除的功能
- 调度和上下文变更
- XCom序列化移除
- 数据集转资产迁移
- DAG包和文件路径
-
reference/migration-checklist.md - 手动检查清单,包括:
- 各问题类型的搜索模式
- 推荐修复方案
- FAB插件警告
- 回调和行为变更
Quick Reference Tables
快速参考表
Key Import Changes
关键导入变更
| Airflow 2.x | Airflow 3 |
|---|---|
| |
| |
| |
| |
| |
| |
| Airflow 2.x | Airflow 3 |
|---|---|
| |
| |
| |
| |
| |
| |
Context Key Changes
上下文键变更
| Removed Key | Replacement |
|---|---|
| |
| Use |
| |
| |
| |
Asset-triggered runs: may be ; use defensively.
logical_dateNonecontext["dag_run"].logical_dateCannot trigger with future : Use and rely on instead.
logical_datelogical_date=Nonerun_idCron note: for scheduled runs using cron, semantics differ under (aligning with ). If you need Airflow 2-style cron data intervals, consider .
logical_dateCronTriggerTimetablelogical_daterun_afterAIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=True| 移除的键 | 替代方案 |
|---|---|
| |
| 结合日期运算使用 |
| |
| |
| |
资产触发的运行:可能为;请谨慎使用。
logical_dateNonecontext["dag_run"].logical_date无法使用未来触发:使用并依赖。
logical_datelogical_date=Nonerun_idCron注意事项:对于使用Cron的调度运行,下的语义有所不同(将与对齐)。如果需要Airflow 2风格的Cron数据间隔,请考虑设置。
CronTriggerTimetablelogical_datelogical_daterun_afterAIRFLOW__SCHEDULER__CREATE_CRON_DATA_INTERVAL=TrueDefault Behavior Changes
默认行为变更
| Setting | Airflow 2 Default | Airflow 3 Default |
|---|---|---|
| | |
| | |
| 设置项 | Airflow 2 默认值 | Airflow 3 默认值 |
|---|---|---|
| | |
| | |
Callback Behavior Changes
回调行为变更
- no longer runs on skip; use
on_success_callbackif needed.on_skipped_callback - with
@teardownnot allowed; teardowns now execute even if DAG run terminated early.TriggerRule.ALWAYS
- 不再在任务跳过时运行;如果需要,请使用
on_success_callback。on_skipped_callback - 不允许将与
@teardown一起使用;现在即使DAG运行提前终止,清理操作也会执行。TriggerRule.ALWAYS
Resources
资源
Related Skills
相关技能
- testing-dags: For testing DAGs after migration
- debugging-dags: For troubleshooting migration issues
- testing-dags:用于迁移后的DAG测试
- debugging-dags:用于排查迁移问题