Loading...
Loading...
Compare original and translation side by side
CRITICAL: Plugin components (fastapi_apps, react_apps, external_views) require Airflow 3.1+. NEVER import,flask, or useflask_appbuilder/appbuilder_views— these are Airflow 2 patterns and will not work in Airflow 3. If existing code uses them, rewrite the entire registration block using FastAPI.flask_blueprintsSecurity: FastAPI plugin endpoints are not automatically protected by Airflow auth. If your endpoints need to be private, implement authentication explicitly using FastAPI's security utilities.Restart required: Changes to Python plugin files require restarting the API server. Static file changes (HTML, JS, CSS) are picked up immediately. Setduring development to load plugins at startup rather than lazily.AIRFLOW__CORE__LAZY_LOAD_PLUGINS=FalseRelative paths always: In,external_viewsmust have no leading slash. In HTML and JavaScript, use relative paths for all assets andhrefcalls. Absolute paths break behind reverse proxies.fetch()
重要提示:插件组件(fastapi_apps、react_apps、external_views)需要 Airflow 3.1+。切勿导入、flask,或使用flask_appbuilder/appbuilder_views—— 这些是Airflow 2的模式,在Airflow 3中无法工作。如果现有代码使用了这些模式,请使用FastAPI重写整个注册块。flask_blueprints安全说明:FastAPI插件端点不会自动受Airflow认证保护。如果你的端点需要私有访问,请使用FastAPI的安全工具显式实现认证。需要重启:修改Python插件文件后需要重启API服务器。静态文件(HTML、JS、CSS)的更改会立即生效。开发期间设置,以便在启动时加载插件而非延迟加载。AIRFLOW__CORE__LAZY_LOAD_PLUGINS=False始终使用相对路径:在中,external_views不得有前导斜杠。在HTML和JavaScript中,所有资源和href调用都要使用相对路径。绝对路径在反向代理环境下会失效。fetch()
fastapi_appsappbuilder_viewsfetch()asyncio.to_thread()static/assets/fastapi_appsappbuilder_viewsfetch()asyncio.to_thread()static/assets/| Component | What it does | Field |
|---|---|---|
| Custom API endpoints | FastAPI app mounted in Airflow process | |
| Nav / page link | Embeds a URL as an iframe or links out | |
| React component | Custom React app embedded in Airflow UI | |
| API middleware | Intercepts all Airflow API requests/responses | |
| Jinja macros | Reusable Python functions in DAG templates | |
| Task instance button | Extra link button in task Detail view | |
| Custom timetable | Custom scheduling logic | |
| Event hooks | Listener callbacks for Airflow events | |
| 组件 | 功能 | 字段 |
|---|---|---|
| 自定义API端点 | 挂载到Airflow进程中的FastAPI应用 | |
| 导航/页面链接 | 将URL以iframe形式嵌入或跳转至外部 | |
| React组件 | 嵌入Airflow UI的自定义React应用 | |
| API中间件 | 拦截所有Airflow API请求/响应 | |
| Jinja宏 | DAG模板中可复用的Python函数 | |
| 任务实例按钮 | 任务详情视图中的额外链接按钮 | |
| 自定义时间表 | 自定义调度逻辑 | |
| 事件钩子 | Airflow事件的监听器回调 | |
plugins/plugins/
my-plugin/
plugin.py # AirflowPlugin subclass — auto-discovered by Airflow
static/
index.html
app.js
assets/
icon.svgBASE_DIR = Path(__file__).parentplugin.pyplugins/my-plugin/StaticFilesfrom pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
BASE_DIR = Path(__file__).parent
app = FastAPI(title="My Plugin")plugins/plugins/
my-plugin/
plugin.py # AirflowPlugin子类 —— Airflow会自动发现
static/
index.html
app.js
assets/
icon.svgplugin.pyBASE_DIR = Path(__file__).parentplugins/my-plugin/StaticFilesfrom pathlib import Path
from airflow.plugins_manager import AirflowPlugin
from fastapi import FastAPI
from fastapi.staticfiles import StaticFiles
from fastapi.responses import FileResponse
BASE_DIR = Path(__file__).parent
app = FastAPI(title="My Plugin")fastapi_apps = [
{
"app": app,
"url_prefix": "/my-plugin", # plugin available at {AIRFLOW_HOST}/my-plugin/
"name": "My Plugin",
}
]
external_views = [
{
"name": "My Plugin",
"href": "my-plugin/ui", # NO leading slash — breaks on Astro and reverse proxies
"destination": "nav", # see locations table below
"category": "browse", # nav bar category (nav destination only)
"url_route": "my-plugin", # unique route name (required for React apps)
"icon": "/my-plugin/static/icon.svg" # DOES use a leading slash — served by FastAPI
}
]undefinedfastapi_apps = [
{
"app": app,
"url_prefix": "/my-plugin", # 插件可通过 {AIRFLOW_HOST}/my-plugin/ 访问
"name": "My Plugin",
}
]
external_views = [
{
"name": "My Plugin",
"href": "my-plugin/ui", # 不要加前导斜杠 —— 在Astro和反向代理环境下会失效
"destination": "nav", # 见下方位置表格
"category": "browse", # 导航栏分类(仅当destination为nav时需要)
"url_route": "my-plugin", # 唯一路由名称(React应用必填)
"icon": "/my-plugin/static/icon.svg" # 这里需要加前导斜杠 —— 由FastAPI提供服务
}
]undefined | Where it appears |
|---|---|
| Left navigation bar (also set |
| Extra tab on every Dag page |
| Extra tab on every Dag run page |
| Extra tab on every task page |
| Extra tab on every task instance page |
| 显示位置 |
|---|---|
| 左侧导航栏(需同时设置 |
| 每个DAG页面的额外标签页 |
| 每个DAG运行页面的额外标签页 |
| 每个任务页面的额外标签页 |
| 每个任务实例页面的额外标签页 |
destination: "nav"destination: "nav""category""browse""admin""category""browse""admin"href"my-plugin/ui"external_viewsfastapi_appsfrom airflow.plugins_manager import AirflowPlugin
class LearnViewPlugin(AirflowPlugin):
name = "learn_view_plugin"
external_views = [
{
"name": "Learn Airflow 3",
"href": "https://www.astronomer.io/docs/learn",
"destination": "dag", # adds a tab to every Dag page
"url_route": "learn"
}
]https://href"my-plugin/ui"external_viewsfastapi_appsfrom airflow.plugins_manager import AirflowPlugin
class LearnViewPlugin(AirflowPlugin):
name = "learn_view_plugin"
external_views = [
{
"name": "Learn Airflow 3",
"href": "https://www.astronomer.io/docs/learn",
"destination": "dag", # 为每个DAG页面添加标签页
"url_route": "learn"
}
]https://@app.get("/ui", response_class=FileResponse)
async def serve_ui():
return FileResponse(BASE_DIR / "static" / "index.html")<!-- correct -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>
<!-- breaks behind a reverse proxy -->
<script src="/my-plugin/static/app.js"></script>fetch('api/dags') // correct — relative to current page
fetch('/my-plugin/api/dags') // breaks on Astro and sub-path deploys@app.get("/ui", response_class=FileResponse)
async def serve_ui():
return FileResponse(BASE_DIR / "static" / "index.html")<!-- 正确写法 -->
<link rel="stylesheet" href="static/app.css" />
<script src="static/app.js?v=20240315"></script>
<!-- 在反向代理环境下会失效 -->
<script src="/my-plugin/static/app.js"></script>fetch('api/dags') // 正确 —— 相对于当前页面
fetch('/my-plugin/api/dags') // 在Astro和子路径部署环境下会失效Only needed if your plugin calls the Airflow REST API. Plugins that only serve static files, register, or use direct DB access do not need this step — skip to Step 5 or Step 6.external_views
仅当插件需要调用Airflow REST API时才需要此步骤。仅提供静态文件、注册或直接访问数据库的插件无需此步骤 —— 跳至步骤5或步骤6。external_views
apache-airflow-client| File found | Action |
|---|---|
| Append |
| |
| None of the above | Tell the user: "Add |
apache-airflow-clientasync defapache-airflow-client| 找到的文件 | 操作 |
|---|---|
| 添加 |
| 执行 |
| 以上都没有 | 告知用户:"在运行插件前,请将 |
apache-airflow-clientasync defReplacewith a short uppercase prefix derived from the plugin name (e.g. if the plugin is called "Trip Analyzer", useMYPLUGIN_). If no plugin name has been given yet, ask the user before writing env var names.TRIP_ANALYZER_
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN") # Astronomer Astro: Deployment API token
_cached_token: str | None = None
_token_expires_at: float = 0.0
_token_lock = threading.Lock()
def _fetch_fresh_token() -> str:
"""Exchange username/password for a JWT via Airflow's auth endpoint."""
response = requests.post(
f"{AIRFLOW_HOST}/auth/token",
json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
timeout=10,
)
response.raise_for_status()
return response.json()["access_token"]
def _get_token() -> str:
# Astronomer Astro production: use static Deployment API token directly
if AIRFLOW_TOKEN:
return AIRFLOW_TOKEN
global _cached_token, _token_expires_at
now = time.monotonic()
# Fast path — no lock if still valid
if _cached_token and now < _token_expires_at:
return _cached_token
# Slow path — one thread refreshes, others wait
with _token_lock:
if _cached_token and now < _token_expires_at:
return _cached_token
_cached_token = _fetch_fresh_token()
_token_expires_at = now + 55 * 60 # refresh 5 min before 1-hour expiry
return _cached_token
def _make_config() -> airflow_sdk.Configuration:
config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
config.access_token = _get_token()
return configMYPLUGIN_USERNAMEMYPLUGIN_PASSWORD.envMYPLUGIN_TOKENastro deployment variable create MYPLUGIN_TOKEN=<token>MYPLUGIN_USERNAMEMYPLUGIN_PASSWORD将替换为插件名称衍生的短大写前缀(例如,如果插件名为"Trip Analyzer",则使用MYPLUGIN_)。如果尚未确定插件名称,请先询问用户再编写环境变量名。TRIP_ANALYZER_
import asyncio
import os
import threading
import time
import airflow_client.client as airflow_sdk
import requests
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")
AIRFLOW_TOKEN = os.environ.get("MYPLUGIN_TOKEN") # Astronomer Astro:部署API令牌
_cached_token: str | None = None
_token_expires_at: float = 0.0
_token_lock = threading.Lock()
def _fetch_fresh_token() -> str:
"""通过Airflow的认证端点,将用户名/密码交换为JWT令牌。"""
response = requests.post(
f"{AIRFLOW_HOST}/auth/token",
json={"username": AIRFLOW_USER, "password": AIRFLOW_PASS},
timeout=10,
)
response.raise_for_status()
return response.json()["access_token"]
def _get_token() -> str:
# Astronomer Astro生产环境:直接使用静态部署API令牌
if AIRFLOW_TOKEN:
return AIRFLOW_TOKEN
global _cached_token, _token_expires_at
now = time.monotonic()
# 快速路径 —— 令牌仍有效时无需加锁
if _cached_token and now < _token_expires_at:
return _cached_token
# 慢速路径 —— 一个线程负责刷新,其他线程等待
with _token_lock:
if _cached_token and now < _token_expires_at:
return _cached_token
_cached_token = _fetch_fresh_token()
_token_expires_at = now + 55 * 60 # 在1小时有效期前5分钟刷新
return _cached_token
def _make_config() -> airflow_sdk.Configuration:
config = airflow_sdk.Configuration(host=AIRFLOW_HOST)
config.access_token = _get_token()
return config.envMYPLUGIN_USERNAMEMYPLUGIN_PASSWORDMYPLUGIN_TOKENastro deployment variable create MYPLUGIN_TOKEN=<token>MYPLUGIN_USERNAMEMYPLUGIN_PASSWORDfrom fastapi import HTTPException
from airflow_client.client.api import DAGApi
@app.get("/api/dags")
async def list_dags():
try:
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return DAGApi(client).get_dags(limit=100).dags
dags = await asyncio.to_thread(_fetch)
return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))API field names: Never guess response field names — verify against the REST API reference. Keyfields:DAGResponse,dag_id,dag_display_name,description,is_paused,timetable_summary,timetable_description,fileloc,owners.tags
def _fetch()await asyncio.to_thread(_fetch)from fastapi import HTTPException
from airflow_client.client.api import DAGApi
@app.get("/api/dags")
async def list_dags():
try:
def _fetch():
with airflow_sdk.ApiClient(_make_config()) as client:
return DAGApi(client).get_dags(limit=100).dags
dags = await asyncio.to_thread(_fetch)
return [{"dag_id": d.dag_id, "is_paused": d.is_paused, "timetable_summary": d.timetable_summary} for d in dags]
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))API字段名称:切勿猜测响应字段名称 —— 请对照REST API参考文档进行验证。的关键字段包括:DAGResponse、dag_id、dag_display_name、description、is_paused、timetable_summary、timetable_description、fileloc、owners。tags
def _fetch()await asyncio.to_thread(_fetch)Warning — use with caution and tell the user. The Airflow metadb is not a public interface. Direct writes or poorly-formed queries can corrupt scheduler state. Whenever you use this pattern, explicitly tell the user: "This accesses Airflow's internal database directly. The internal models are not part of the public API, can change between Airflow versions, and incorrect queries can cause issues in the metadb. Preferunless the operation is not exposed via the REST API."apache-airflow-client
asyncio.to_thread()from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session
@app.get("/api/dags/status")
async def dag_status():
def _fetch():
@provide_session
def _query(session=None):
dagbag = DagBag()
paused = sum(
1 for dag_id in dagbag.dags
if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
and m.is_paused
)
return {"total": len(dagbag.dags), "paused": paused}
return _query()
return await asyncio.to_thread(_fetch)警告 —— 谨慎使用并告知用户。Airflow元数据库并非公共接口。直接写入或格式不当的查询可能会破坏调度器状态。无论何时使用此模式,都要明确告知用户:"此方式直接访问Airflow的内部数据库。内部模型不属于公共API,在不同Airflow版本间可能会变化,错误的查询可能会导致元数据库出现问题。除非操作未通过REST API暴露,否则优先使用。"apache-airflow-client
asyncio.to_thread()from airflow.models import DagBag, DagModel
from airflow.utils.db import provide_session
@app.get("/api/dags/status")
async def dag_status():
def _fetch():
@provide_session
def _query(session=None):
dagbag = DagBag()
paused = sum(
1 for dag_id in dagbag.dags
if (m := session.query(DagModel).filter(DagModel.dag_id == dag_id).first())
and m.is_paused
)
return {"total": len(dagbag.dags), "paused": paused}
return _query()
return await asyncio.to_thread(_fetch)If you need an SDK method or field not shown in the examples below, verify it before generating code — do not guess. Either runin any environment where the SDK is installed, or search thepython3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"repo for the class definition.apache/airflow-client-python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody
@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
result = await asyncio.to_thread(_run)
return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}
@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
await asyncio.to_thread(_run)
return {"dag_id": dag_id, "is_paused": is_paused}
@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).delete_dag(dag_id)
await asyncio.to_thread(_run)
return {"deleted": dag_id}
def normalize_state(raw) -> str:
"""Convert SDK enum objects to plain strings before sending to the frontend."""
if raw is None:
return "never_run"
return str(raw).lower()如果需要示例中未展示的SDK方法或字段,请在生成代码前进行验证 —— 切勿猜测。可在任何安装了SDK的环境中执行,或在python3 -c "from airflow_client.client.api import <Class>; print([m for m in dir(<Class>) if not m.startswith('_')])"仓库中搜索类定义。apache/airflow-client-python
from airflow_client.client.api import DAGApi, DagRunApi
from airflow_client.client.models import TriggerDAGRunPostBody, DAGPatchBody
@app.post("/api/dags/{dag_id}/trigger")
async def trigger_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
return DagRunApi(client).trigger_dag_run(dag_id, TriggerDAGRunPostBody())
result = await asyncio.to_thread(_run)
return {"run_id": result.dag_run_id, "state": normalize_state(result.state)}
@app.patch("/api/dags/{dag_id}/pause")
async def toggle_pause(dag_id: str, is_paused: bool):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).patch_dag(dag_id, DAGPatchBody(is_paused=is_paused))
await asyncio.to_thread(_run)
return {"dag_id": dag_id, "is_paused": is_paused}
@app.delete("/api/dags/{dag_id}")
async def delete_dag(dag_id: str):
def _run():
with airflow_sdk.ApiClient(_make_config()) as client:
DAGApi(client).delete_dag(dag_id)
await asyncio.to_thread(_run)
return {"deleted": dag_id}
def normalize_state(raw) -> str:
"""在发送到前端前,将SDK枚举对象转换为普通字符串。"""
if raw is None:
return "never_run"
return str(raw).lower()from airflow_client.client.api import DagRunApi, TaskInstanceApifrom airflow_client.client.api import DagRunApi, TaskInstanceApiundefinedundefinedStreamingResponseimport requests
from starlette.responses import StreamingResponse
@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
def _stream():
r = requests.get(f"https://files.example.com/{filename}", stream=True)
r.raise_for_status()
return r
response = await asyncio.to_thread(_stream)
return StreamingResponse(
response.iter_content(chunk_size=8192),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)requests.get()asyncio.to_threadStreamingResponseimport requests
from starlette.responses import StreamingResponse
@app.get("/api/files/{filename}")
async def proxy_file(filename: str):
def _stream():
r = requests.get(f"https://files.example.com/{filename}", stream=True)
r.raise_for_status()
return r
response = await asyncio.to_thread(_stream)
return StreamingResponse(
response.iter_content(chunk_size=8192),
media_type="application/octet-stream",
headers={"Content-Disposition": f'attachment; filename="{filename}"'},
)requests.get()asyncio.to_threadfrom airflow.plugins_manager import AirflowPlugin
def format_confidence(confidence: float) -> str:
return f"{confidence * 100:.2f}%"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
macros = [format_confidence]{{ macros.my_plugin.format_confidence(0.95) }}
{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}macros.{plugin_name}.{function_name}from airflow.plugins_manager import AirflowPlugin
def format_confidence(confidence: float) -> str:
return f"{confidence * 100:.2f}%"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
macros = [format_confidence]{{ macros.my_plugin.format_confidence(0.95) }}
{{ macros.my_plugin.format_confidence(ti.xcom_pull(task_ids='score_task')['confidence']) }}macros.{plugin_name}.{function_name}from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
# runs before every request to the Airflow API server
response = await call_next(request)
return response
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_root_middlewares = [
{"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
]from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request, Response
class AuditMiddleware(BaseHTTPMiddleware):
async def dispatch(self, request: Request, call_next) -> Response:
# 在每个Airflow API服务器请求前运行
response = await call_next(request)
return response
class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_root_middlewares = [
{"middleware": AuditMiddleware, "args": [], "kwargs": {}, "name": "Audit"}
]from airflow.sdk.bases.operatorlink import BaseOperatorLink
class MyDashboardLink(BaseOperatorLink):
name = "Open in Dashboard"
def get_link(self, operator, *, ti_key, **context) -> str:
return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
global_operator_extra_links = [MyDashboardLink()] # appears on every task
# operator_extra_links = [MyDashboardLink()] # attach to specific operator insteadfrom airflow.sdk.bases.operatorlink import BaseOperatorLink
class MyDashboardLink(BaseOperatorLink):
name = "在仪表板中打开"
def get_link(self, operator, *, ti_key, **context) -> str:
return f"https://my-dashboard.example.com/tasks/{ti_key.task_id}"
class MyPlugin(AirflowPlugin):
name = "my_plugin"
global_operator_extra_links = [MyDashboardLink()] # 显示在所有任务上
# operator_extra_links = [MyDashboardLink()] # 仅附加到特定Operator// In your bundle (e.g. my-app.js)
globalThis['My Plugin'] = MyComponent; // matches plugin name
globalThis.AirflowPlugin = MyComponent; // fallback Airflow looks forclass MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
react_apps = [
{
"name": "My Plugin",
"bundle_url": "/my-plugin/my-app.js",
"destination": "nav",
"category": "browse",
"url_route": "my-plugin",
}
]url_routereact_apps = [
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav", "url_route": "my-widget-nav"},
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag", "url_route": "my-widget-dag"},
]React app integration is experimental in Airflow 3.1. Interfaces may change in future releases.
// 在你的包中(如my-app.js)
globalThis['My Plugin'] = MyComponent; # 与插件名称匹配
globalThis.AirflowPlugin = MyComponent; # Airflow会寻找的回退变量class MyPlugin(AirflowPlugin):
name = "my_plugin"
fastapi_apps = [{"app": app, "url_prefix": "/my-plugin", "name": "My Plugin"}]
react_apps = [
{
"name": "My Plugin",
"bundle_url": "/my-plugin/my-app.js",
"destination": "nav",
"category": "browse",
"url_route": "my-plugin",
}
]url_routereact_apps = [
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "nav", "url_route": "my-widget-nav"},
{"name": "My Widget", "bundle_url": "/my-plugin/widget.js", "destination": "dag", "url_route": "my-widget-dag"},
]React应用集成在Airflow 3.1中属于实验性功能。接口在未来版本中可能会变化。
AIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")undefinedAIRFLOW_HOST = os.environ.get("MYPLUGIN_HOST", "http://localhost:8080")
AIRFLOW_USER = os.environ.get("MYPLUGIN_USERNAME", "admin")
AIRFLOW_PASS = os.environ.get("MYPLUGIN_PASSWORD", "admin")undefined
```bash
astro dev restart # required after any Python plugin change
```bash
astro dev restart # 修改Python插件后必须执行
**Production Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.comAIRFLOW__CORE__LAZY_LOAD_PLUGINS=False<script src="static/app.js?v=20240315-1"></script>{AIRFLOW_HOST}/{url_prefix}/docs{AIRFLOW_HOST}/{url_prefix}/openapi.json
**生产环境Astronomer:**
```bash
astro deployment variable create --deployment-id <id> MYPLUGIN_HOST=https://airflow.example.comAIRFLOW__CORE__LAZY_LOAD_PLUGINS=False<script src="static/app.js?v=20240315-1"></script>{AIRFLOW_HOST}/{url_prefix}/docs{AIRFLOW_HOST}/{url_prefix}/openapi.json| Problem | Cause | Fix |
|---|---|---|
| Nav link goes to 404 | Leading | |
| Nav icon not showing | Missing | |
| Event loop freezes under load | Sync SDK called directly in | Wrap with |
| 401 errors after 1 hour | JWT expires with no refresh | Use the 5-minute pre-expiry refresh pattern |
| Directory missing | Create |
| Plugin not showing up | Python file changed without restart | |
| Endpoints accessible without login | FastAPI apps are not auto-authenticated | Add FastAPI security (e.g. OAuth2, API key) if endpoints must be private |
| Middleware affecting wrong routes | Middleware applies to all API traffic | Filter by |
JS | Absolute path in | Always use relative paths: |
| 问题 | 原因 | 解决方法 |
|---|---|---|
| 导航链接跳转到404 | | 使用 |
| 导航图标不显示 | | |
| 高负载下事件循环冻结 | 在 | 用 |
| 1小时后出现401错误 | JWT过期且未刷新 | 使用提前5分钟刷新的模式 |
启动时 | 目录不存在 | 在启动前创建 |
| 插件未显示 | 修改Python文件后未重启 | 执行 |
| 端点无需登录即可访问 | FastAPI应用未自动认证 | 如果端点需要私有访问,请添加FastAPI安全机制(如OAuth2、API密钥) |
| 中间件影响错误的路由 | 中间件应用于所有API流量 | 在 |
JS | | 始终使用相对路径: |