mongodb-atlas-stream-processing

Compare original and translation side by side

🇺🇸

Original

English
🇨🇳

Translation

Chinese

MongoDB Atlas Streams

MongoDB Atlas Streams

Build, operate, and debug Atlas Stream Processing (ASP) pipelines using four MCP tools from the MongoDB MCP Server.
使用MongoDB MCP Server提供的四款MCP工具构建、运维和调试Atlas Stream Processing (ASP)管道。

Prerequisites

前置条件

This skill requires the MongoDB MCP Server connected with:
  • Atlas API credentials (
    apiClientId
    and
    apiClientSecret
    )
The 4 tools:
atlas-streams-discover
,
atlas-streams-build
,
atlas-streams-manage
,
atlas-streams-teardown
.
All operations require an Atlas project ID. If unknown, call
atlas-list-projects
first to find your project ID.
本技能需要连接以下配置的MongoDB MCP Server
  • Atlas API凭证(
    apiClientId
    apiClientSecret
四款工具:
atlas-streams-discover
atlas-streams-build
atlas-streams-manage
atlas-streams-teardown
所有操作都需要Atlas项目ID。如果不知道项目ID,请先调用
atlas-list-projects
查找。

If MCP tools are unavailable

若MCP工具不可用

If the MongoDB MCP Server is not connected or the streams tools are missing, see references/mcp-troubleshooting.md for diagnostic steps and fallback options.
如果MongoDB MCP Server未连接或流处理工具缺失,请查看references/mcp-troubleshooting.md获取诊断步骤和备选方案。

Tool Selection Matrix

工具选择矩阵

atlas-streams-discover — ALL read operations

atlas-streams-discover — 所有读取操作

ActionUse when
list-workspaces
See all workspaces in a project
inspect-workspace
Review workspace config, state, region
list-connections
See all connections in a workspace
inspect-connection
Check connection state, config, health
list-processors
See all processors in a workspace
inspect-processor
Check processor state, pipeline, config
diagnose-processor
Full health report: state, stats, errors
get-networking
PrivateLink and VPC peering details. Optional:
cloudProvider
+
region
to get Atlas account details for PrivateLink setup
Pagination (all list actions):
limit
(1-100, default 20),
pageNum
(default 1). Response format:
responseFormat
"concise"
(default for list actions) or
"detailed"
(default for inspect/diagnose).
操作使用场景
list-workspaces
查看项目中的所有工作区
inspect-workspace
查看工作区配置、状态、区域信息
list-connections
查看工作区中的所有连接
inspect-connection
检查连接状态、配置、健康状况
list-processors
查看工作区中的所有处理器
inspect-processor
检查处理器状态、管道、配置
diagnose-processor
完整健康报告:状态、统计数据、错误信息
get-networking
PrivateLink和VPC对等连接详情。可选参数:
cloudProvider
+
region
,用于获取PrivateLink设置所需的Atlas账户信息
分页(所有列表操作):
limit
(取值1-100,默认20)、
pageNum
(默认1)。 响应格式
responseFormat
"concise"
(列表操作默认格式)或
"detailed"
(检查/诊断操作默认格式)。

atlas-streams-build — ALL create operations

atlas-streams-build — 所有创建操作

ResourceKey parameters
workspace
cloudProvider
,
region
,
tier
(default SP10),
includeSampleData
connection
connectionName
,
connectionType
(Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample),
connectionConfig
processor
processorName
,
pipeline
(must start with
$source
, end with
$merge
/
$emit
),
dlq
,
autoStart
privatelink
privateLinkConfig
(project-level, not tied to a specific workspace)
Field mapping — only fill fields for the selected resource type:
  • resource = "workspace": Fill:
    projectId
    ,
    workspaceName
    ,
    cloudProvider
    ,
    region
    ,
    tier
    ,
    includeSampleData
    . Leave empty: all connection and processor fields.
  • resource = "connection": Fill:
    projectId
    ,
    workspaceName
    ,
    connectionName
    ,
    connectionType
    ,
    connectionConfig
    . Leave empty: all workspace and processor fields. (See references/connection-configs.md for type-specific schemas.)
  • resource = "processor": Fill:
    projectId
    ,
    workspaceName
    ,
    processorName
    ,
    pipeline
    ,
    dlq
    (recommended),
    autoStart
    (optional). Leave empty: all workspace and connection fields. (See references/pipeline-patterns.md for pipeline examples.)
  • resource = "privatelink": Fill:
    projectId
    ,
    privateLinkConfig
    . Note: PrivateLink is project-level, not workspace-level.
    workspaceName
    is not required — omit it. Leave empty: all connection and processor fields.
资源关键参数
workspace
cloudProvider
region
tier
(默认SP10)、
includeSampleData
connection
connectionName
connectionType
(Kafka/Cluster/S3/Https/Kinesis/Lambda/SchemaRegistry/Sample)、
connectionConfig
processor
processorName
pipeline
(必须以
$source
开头,以
$merge
/
$emit
结尾)、
dlq
autoStart
privatelink
privateLinkConfig
(项目级配置,不绑定特定工作区)
字段映射 — 仅填写所选资源类型的字段:
  • resource = "workspace": 填写:
    projectId
    workspaceName
    cloudProvider
    region
    tier
    includeSampleData
    。留空:所有连接和处理器字段。
  • resource = "connection": 填写:
    projectId
    workspaceName
    connectionName
    connectionType
    connectionConfig
    。留空:所有工作区和处理器字段。(查看references/connection-configs.md获取特定类型的配置 schema。)
  • resource = "processor": 填写:
    projectId
    workspaceName
    processorName
    pipeline
    dlq
    (推荐配置)、
    autoStart
    (可选)。留空:所有工作区和连接字段。(查看references/pipeline-patterns.md获取管道示例。)
  • resource = "privatelink": 填写:
    projectId
    privateLinkConfig
    。注意:PrivateLink是项目级配置,而非工作区级。无需填写
    workspaceName
    — 直接省略。留空:所有连接和处理器字段。

atlas-streams-manage — ALL update/state operations

atlas-streams-manage — 所有更新/状态操作

ActionNotes
start-processor
Begins billing. Optional
tier
override,
resumeFromCheckpoint
stop-processor
Stops billing. Retains state 45 days
modify-processor
Processor must be stopped first. Change pipeline, DLQ, or name
update-workspace
Change tier or region
update-connection
Update config (networking is immutable — must delete and recreate)
accept-peering
/
reject-peering
VPC peering management
Field mapping — always fill
projectId
,
workspaceName
, then by action:
  • "start-processor"
    resourceName
    . Optional:
    tier
    ,
    resumeFromCheckpoint
    ,
    startAtOperationTime
    (ISO 8601 timestamp to resume from a specific point)
  • "stop-processor"
    resourceName
  • "modify-processor"
    resourceName
    . At least one of:
    pipeline
    ,
    dlq
    ,
    newName
  • "update-workspace"
    newRegion
    or
    newTier
  • "update-connection"
    resourceName
    ,
    connectionConfig
    . Exception: networking config (e.g., PrivateLink) cannot be modified after creation — delete and recreate.
  • "accept-peering"
    peeringId
    ,
    requesterAccountId
    ,
    requesterVpcId
  • "reject-peering"
    peeringId
State pre-checks:
  • start-processor
    → errors if processor is already STARTED
  • stop-processor
    → no-ops if already STOPPED or CREATED (not an error)
  • modify-processor
    → errors if processor is STARTED (must stop first)
Processor states:
CREATED
STARTED
(via start) →
STOPPED
(via stop). Can also enter
FAILED
on runtime errors. Modify requires STOPPED or CREATED state.
Teardown safety checks:
  • Processor deletion → auto-stops before deleting (no need to stop manually first)
  • Connection deletion → blocks if any running processor references it. Stop/delete referencing processors first.
  • Workspace deletion → See detailed workflow below (lines 108-111).
操作说明
start-processor
开始计费。可选参数:
tier
覆盖配置、
resumeFromCheckpoint
stop-processor
停止计费。状态会保留45天
modify-processor
处理器必须先停止。可修改管道、DLQ或名称
update-workspace
修改层级或区域
update-connection
更新配置(网络配置不可修改 — 必须删除后重新创建)
accept-peering
/
reject-peering
VPC对等连接管理
字段映射 — 始终填写
projectId
workspaceName
,然后根据操作填写:
  • "start-processor"
    resourceName
    。可选参数:
    tier
    resumeFromCheckpoint
    startAtOperationTime
    (ISO 8601时间戳,用于从特定时间点恢复)
  • "stop-processor"
    resourceName
  • "modify-processor"
    resourceName
    。至少填写以下之一:
    pipeline
    dlq
    newName
  • "update-workspace"
    newRegion
    newTier
  • "update-connection"
    resourceName
    connectionConfig
    例外:网络配置(如PrivateLink)创建后无法修改 — 需删除后重新创建。
  • "accept-peering"
    peeringId
    requesterAccountId
    requesterVpcId
  • "reject-peering"
    peeringId
状态预检查:
  • start-processor
    → 如果处理器已处于STARTED状态则报错
  • stop-processor
    → 如果处理器已处于STOPPED或CREATED状态则无操作(不报错)
  • modify-processor
    → 如果处理器处于STARTED状态则报错(必须先停止)
处理器状态:
CREATED
STARTED
(通过start操作)→
STOPPED
(通过stop操作)。运行时出错也可能进入
FAILED
状态。修改操作要求处理器处于STOPPED或CREATED状态。
销毁安全检查:
  • 处理器删除 → 删除前自动停止(无需手动先停止)
  • 连接删除 → 如果有运行中的处理器引用该连接则阻止删除。需先停止/删除引用该连接的处理器。
  • 工作区删除 → 查看下文详细流程(第108-111行)。

atlas-streams-teardown — ALL delete operations

atlas-streams-teardown — 所有删除操作

ResourceSafety behavior
processor
Auto-stops before deleting
connection
Blocks if referenced by running processor
workspace
Cascading delete of all connections and processors
privatelink
/
peering
Remove networking resources
Field mapping — always fill
projectId
,
resource
, then:
  • resource: "workspace"
    workspaceName
  • resource: "connection"
    or
    "processor"
    workspaceName
    ,
    resourceName
  • resource: "privatelink"
    or
    "peering"
    resourceName
    (the ID). These are project-level resources, not tied to a specific workspace.
Before deleting a workspace, inspect it first:
  1. atlas-streams-discover
    inspect-workspace
    — get connection/processor counts
  2. Present to user: "Workspace X contains N connections and M processors. Deleting permanently removes all. Proceed?"
  3. Wait for confirmation before calling
    atlas-streams-teardown
资源安全行为
processor
删除前自动停止
connection
如果被运行中的处理器引用则阻止删除
workspace
级联删除所有连接和处理器
privatelink
/
peering
删除网络资源
字段映射 — 始终填写
projectId
resource
,然后:
  • resource: "workspace"
    workspaceName
  • resource: "connection"
    "processor"
    workspaceName
    resourceName
  • resource: "privatelink"
    "peering"
    resourceName
    (ID)。这些是项目级资源,不绑定特定工作区。
删除工作区前,先检查工作区:
  1. atlas-streams-discover
    inspect-workspace
    — 获取连接/处理器数量
  2. 告知用户:"工作区X包含N个连接和M个处理器。删除将永久移除所有资源。是否继续?"
  3. 等待用户确认后再调用
    atlas-streams-teardown

CRITICAL: Validate Before Creating Processors

重要提示:创建处理器前必须验证

You MUST call
search-knowledge
before composing any processor pipeline.
This is not optional.
  • Field validation: Query with the sink/source type, e.g. "Atlas Stream Processing $emit S3 fields" or "Atlas Stream Processing Kafka $source configuration". This catches errors like
    prefix
    vs
    path
    for S3
    $emit
    .
  • Pattern examples: Query with
    dataSources: [{"name": "devcenter"}]
    for working pipelines, e.g. "Atlas Stream Processing tumbling window example".
Also fetch examples from the official ASP examples repo when building non-trivial processors: https://github.com/mongodb/ASP_example (quickstarts, example processors, Terraform examples). Start with
example_processors/README.md
for the full pattern catalog.
Key quickstarts:
QuickstartPattern
00_hello_world.json
Inline
$source.documents
with
$match
(zero infra, ephemeral)
01_changestream_basic.json
Change stream → tumbling window →
$merge
to Atlas
03_kafka_to_mongo.json
Kafka source → tumbling window rollup →
$merge
to Atlas
04_mongo_to_mongo.json
Chained processors: rollup → archive to separate collection
05_kafka_tail.json
Real-time Kafka topic monitoring (sinkless, like
tail -f
)
在编写任何处理器管道前,必须调用
search-knowledge
。这是强制要求。
  • 字段验证: 根据接收器/源类型查询,例如"Atlas Stream Processing $emit S3 fields"或"Atlas Stream Processing Kafka $source configuration"。这可以捕获诸如S3
    $emit
    prefix
    path
    混淆的错误。
  • 模式示例: 使用
    dataSources: [{"name": "devcenter"}]
    查询可用管道,例如"Atlas Stream Processing tumbling window example"。
关键快速入门示例:
快速入门模式
00_hello_world.json
内联
$source.documents
搭配
$match
(无需基础设施,临时运行)
01_changestream_basic.json
变更流 → 滚动窗口 →
$merge
到Atlas
03_kafka_to_mongo.json
Kafka源 → 滚动窗口聚合 →
$merge
到Atlas
04_mongo_to_mongo.json
链式处理器:聚合 → 归档到单独集合
05_kafka_tail.json
实时Kafka主题监控(无接收器,类似
tail -f

Pipeline Rules & Warnings

管道规则与警告

Invalid constructs — these are NOT valid in streaming pipelines:
  • $$NOW
    ,
    $$ROOT
    ,
    $$CURRENT
    — NOT available in stream processing. NEVER use these. Use the document's own timestamp field or
    _stream_meta
    metadata for event time instead of
    $$NOW
    .
  • HTTPS connections as
    $source
    — HTTPS is for
    $https
    enrichment or sink only, NOT as a data source
  • Kafka
    $source
    without
    topic
    — topic field is required
  • Pipelines without a sink — terminal stage (
    $merge
    ,
    $emit
    ,
    $https
    , or
    $externalFunction
    async) required for deployed processors (sinkless only works via
    sp.process()
    )
  • Lambda as
    $emit
    target
    — Lambda uses
    $externalFunction
    (mid-pipeline enrichment), not
    $emit
  • $validate
    with
    validationAction: "error"
    — crashes processor; use
    "dlq"
    instead
Required fields by stage:
  • $source
    (change stream)
    : include
    fullDocument: "updateLookup"
    to get the full document content
  • $source
    (Kinesis)
    : use
    stream
    (NOT
    streamName
    or
    topic
    )
  • $emit
    (Kinesis)
    : MUST include
    partitionKey
  • $emit
    (S3)
    : use
    path
    (NOT
    prefix
    )
  • $https
    : must include
    connectionName
    ,
    path
    ,
    method
    ,
    as
    ,
    onError: "dlq"
  • $externalFunction
    : must include
    connectionName
    ,
    functionName
    ,
    execution
    ,
    as
    ,
    onError: "dlq"
  • $validate
    : must include
    validator
    with
    $jsonSchema
    and
    validationAction: "dlq"
  • $lookup
    : include
    parallelism
    setting (e.g.,
    parallelism: 2
    ) for concurrent I/O
  • AWS connections (S3, Kinesis, Lambda): IAM role ARN must be registered via Atlas Cloud Provider Access first. Always confirm this with user. See references/connection-configs.md for details.
See references/pipeline-patterns.md for stage field examples with JSON syntax.
SchemaRegistry connection:
connectionType
must be
"SchemaRegistry"
(not
"Kafka"
). Schema type values are case-sensitive (use lowercase
avro
, not
AVRO
). See references/connection-configs.md for required fields and auth types.
无效结构 — 这些在流式管道中不合法:
  • $$NOW
    $$ROOT
    $$CURRENT
    — 流处理中不可用。绝对不要使用。使用文档自身的时间戳字段或
    _stream_meta
    元数据作为事件时间,替代
    $$NOW
  • HTTPS连接作为
    $source
    — HTTPS仅用于
    $https
    数据增强或接收器,不能作为数据源
  • Kafka
    $source
    缺少
    topic
    — 必须填写topic字段
  • 无接收器的管道 — 部署的处理器必须包含终端阶段(
    $merge
    $emit
    $https
    或异步
    $externalFunction
    )(无接收器仅适用于
    sp.process()
  • Lambda作为
    $emit
    目标
    — Lambda使用
    $externalFunction
    (管道中间的数据增强),而非
    $emit
  • $validate
    搭配
    validationAction: "error"
    — 会导致处理器崩溃;请使用
    "dlq"
    替代
各阶段必填字段:
  • $source
    (变更流)
    :需包含
    fullDocument: "updateLookup"
    以获取完整文档内容
  • $source
    (Kinesis)
    :使用
    stream
    (而非
    streamName
    topic
  • $emit
    (Kinesis)
    :必须包含
    partitionKey
  • $emit
    (S3)
    :使用
    path
    (而非
    prefix
  • $https
    :必须包含
    connectionName
    path
    method
    as
    onError: "dlq"
  • $externalFunction
    :必须包含
    connectionName
    functionName
    execution
    as
    onError: "dlq"
  • $validate
    :必须包含带有
    $jsonSchema
    validator
    validationAction: "dlq"
  • $lookup
    :需包含
    parallelism
    设置(例如
    parallelism: 2
    )以支持并发I/O
  • AWS连接(S3、Kinesis、Lambda):IAM角色ARN必须先通过Atlas云提供商访问注册。请务必与用户确认这一点。查看references/connection-configs.md获取详情。
查看references/pipeline-patterns.md获取带JSON语法的阶段字段示例。
SchemaRegistry连接:
connectionType
必须为
"SchemaRegistry"
(而非
"Kafka"
)。Schema类型值区分大小写(使用小写
avro
,而非
AVRO
)。查看references/connection-configs.md获取必填字段和认证类型。

MCP Tool Behaviors

MCP工具行为

Elicitation: When creating connections, the build tool auto-collects missing sensitive fields (passwords, bootstrap servers) via MCP elicitation. Do NOT ask the user for these — let the tool collect them.
Auto-normalization:
  • bootstrapServers
    array → auto-converted to comma-separated string
  • schemaRegistryUrls
    string → auto-wrapped in array
  • dbRoleToExecute
    → defaults to
    {role: "readWriteAnyDatabase", type: "BUILT_IN"}
    for Cluster connections
Workspace creation:
includeSampleData
defaults to
true
, which auto-creates the
sample_stream_solar
connection.
Region naming: The
region
field uses Atlas-specific names that differ by cloud provider. Using the wrong format returns a cryptic
dataProcessRegion
error.
ProviderCloud RegionStreams
region
Value
AWSus-east-1
VIRGINIA_USA
AWSus-east-2
OHIO_USA
AWSeu-west-1
DUBLIN_IRL
GCPus-central1
US_CENTRAL1
GCPeurope-west1
EUROPE_WEST1
Azureeastus
eastus
Azurewesteurope
westeurope
See references/connection-configs.md for the full region mapping table. If unsure, inspect an existing workspace with
atlas-streams-discover
inspect-workspace
and check
dataProcessRegion.region
.
信息收集: 创建连接时,构建工具会通过MCP自动收集缺失的敏感字段(密码、引导服务器)。请勿直接向用户索要这些信息 — 让工具自动收集。
自动标准化:
  • bootstrapServers
    数组 → 自动转换为逗号分隔的字符串
  • schemaRegistryUrls
    字符串 → 自动包装为数组
  • dbRoleToExecute
    → 对于Cluster连接,默认值为
    {role: "readWriteAnyDatabase", type: "BUILT_IN"}
工作区创建:
includeSampleData
默认值为
true
,会自动创建
sample_stream_solar
连接。
区域命名:
region
字段使用Atlas特定的名称,不同云提供商的名称不同。使用错误格式会返回模糊的
dataProcessRegion
错误。
提供商云区域Streams
region
取值
AWSus-east-1
VIRGINIA_USA
AWSus-east-2
OHIO_USA
AWSeu-west-1
DUBLIN_IRL
GCPus-central1
US_CENTRAL1
GCPeurope-west1
EUROPE_WEST1
Azureeastus
eastus
Azurewesteurope
westeurope
查看references/connection-configs.md获取完整的区域映射表。如果不确定,使用
atlas-streams-discover
inspect-workspace
检查现有工作区,查看
dataProcessRegion.region
字段。

Connection Capabilities — Source/Sink Reference

连接能力 — 源/接收器参考

Know what each connection type can do before creating pipelines:
Connection TypeAs Source ($source)As Sink ($merge / $emit)Mid-PipelineNotes
Cluster✅ Change streams✅ $merge to collections✅ $lookupChange streams monitor insert/update/delete/replace operations
Kafka✅ Topic consumer✅ $emit to topicsSource MUST include
topic
field
Sample Stream✅ Sample data❌ Not validTesting/demo only
S3❌ Not valid✅ $emit to bucketsSink only - use
path
,
format
,
compression
. Supports AWS PrivateLink.
Https❌ Not valid✅ $https as sink✅ $https enrichmentCan be used mid-pipeline for enrichment OR as final sink stage
AWSLambda❌ Not valid✅ $externalFunction (async only)✅ $externalFunction (sync or async)Sink:
execution: "async"
required. Mid-pipeline:
execution: "sync"
or
"async"
AWS Kinesis✅ Stream consumer✅ $emit to streamsSimilar to Kafka pattern
SchemaRegistry❌ Not valid❌ Not valid✅ Schema resolutionMetadata only - used by Kafka connections for Avro schemas
Common connection usage mistakes to avoid:
  • ❌ Using
    $externalFunction
    as sink with
    execution: "sync"
    → Must use
    execution: "async"
    for sink stage
  • ❌ Forgetting change streams exist → Atlas Cluster is a powerful source, not just a sink
  • ❌ Using
    $merge
    with Kafka → Use
    $emit
    for Kafka sinks
See references/connection-configs.md for detailed connection configuration schemas by type.
创建管道前,请了解每种连接类型的功能:
连接类型作为源($source)作为接收器($merge / $emit)管道中间阶段说明
Cluster✅ 变更流✅ $merge到集合✅ $lookup变更流监控插入/更新/删除/替换操作
Kafka✅ 主题消费者✅ $emit到主题源必须包含
topic
字段
Sample Stream✅ 示例数据❌ 不支持仅用于测试/演示
S3❌ 不支持✅ $emit到存储桶仅作为接收器 - 使用
path
format
compression
。支持AWS PrivateLink。
Https❌ 不支持✅ $https作为接收器✅ $https数据增强可用于管道中间的数据增强,或作为最终接收器阶段
AWSLambda❌ 不支持✅ $externalFunction(仅异步)✅ $externalFunction(同步或异步)接收器: 必须设置
execution: "async"
管道中间: 可设置
execution: "sync"
"async"
AWS Kinesis✅ 流消费者✅ $emit到流模式与Kafka类似
SchemaRegistry❌ 不支持❌ 不支持✅ Schema解析仅元数据 - 供Kafka连接用于Avro schema
常见连接使用错误:
  • ❌ 使用
    $externalFunction
    作为接收器时设置
    execution: "sync"
    → 接收器阶段必须使用
    execution: "async"
  • ❌ 忽略变更流的存在 → Atlas Cluster是强大的数据源,而非仅作为接收器
  • ❌ 对Kafka使用
    $merge
    → Kafka接收器请使用
    $emit
查看references/connection-configs.md获取各类型连接的详细配置schema。

Core Workflows

核心工作流

Setup from scratch

从零开始搭建

  1. atlas-streams-discover
    list-workspaces
    (check existing)
  2. atlas-streams-build
    resource: "workspace"
    (region near data, SP10 for dev)
  3. atlas-streams-build
    resource: "connection"
    (for each source/sink/enrichment)
  4. Validate connections:
    atlas-streams-discover
    list-connections
    +
    inspect-connection
    for each — verify names match targets, present summary to user
  5. Call
    search-knowledge
    to validate field names. Fetch relevant examples from https://github.com/mongodb/ASP_example
  6. atlas-streams-build
    resource: "processor"
    (with DLQ configured)
  7. atlas-streams-manage
    start-processor
    (warn about billing)
  1. atlas-streams-discover
    list-workspaces
    (检查现有工作区)
  2. atlas-streams-build
    resource: "workspace"
    (选择靠近数据的区域,开发环境使用SP10层级)
  3. atlas-streams-build
    resource: "connection"
    (为每个源/接收器/数据增强创建连接)
  4. 验证连接:
    atlas-streams-discover
    list-connections
    +
    inspect-connection
    检查每个连接 — 验证名称与目标匹配,并向用户展示汇总信息
  5. 调用
    search-knowledge
    验证字段名称。从https://github.com/mongodb/ASP_example获取相关示例
  6. atlas-streams-build
    resource: "processor"
    (配置DLQ)
  7. atlas-streams-manage
    start-processor
    (提醒用户计费规则)

Workflow Patterns

工作流模式

Incremental pipeline development (recommended): See references/development-workflow.md for the full 5-phase lifecycle.
  1. Start with basic
    $source
    $merge
    pipeline (validate connectivity)
  2. Add
    $match
    stages (validate filtering)
  3. Add
    $addFields
    /
    $project
    transforms (validate reshaping)
  4. Add windowing or enrichment (validate aggregation logic)
  5. Add error handling / DLQ configuration
Modify a processor pipeline:
  1. atlas-streams-manage
    action: "stop-processor"
    processor MUST be stopped first
  2. atlas-streams-manage
    action: "modify-processor"
    — provide new pipeline
  3. atlas-streams-manage
    action: "start-processor"
    — restart
Debug a failing processor:
  1. atlas-streams-discover
    diagnose-processor
    — one-shot health report. Always call this first.
  2. Commit to a specific root cause. Match symptoms to diagnostic patterns:
    • Error 419 + "no partitions found" → Kafka topic doesn't exist or is misspelled
    • State: FAILED + multiple restarts → connection-level error (bypasses DLQ), check connection config
    • State: STARTED + zero output + windowed pipeline → likely idle Kafka partitions blocking window closure; add
      partitionIdleTimeout
      to Kafka
      $source
      (e.g.,
      {"size": 30, "unit": "second"}
      )
    • State: STARTED + zero output + non-windowed → check if source has data; inspect Kafka offset lag
    • High memoryUsageBytes approaching tier limit → OOM risk; recommend higher tier
    • DLQ count increasing → per-document errors; use MongoDB
      find
      on DLQ collection See references/output-diagnostics.md for the full pattern table.
  3. Classify processor type before interpreting output volume (alert vs transformation vs filter).
  4. Provide concrete, ordered fix steps specific to the diagnosed root cause. Do NOT present a list of hypothetical scenarios.
  5. If detailed logs are needed, direct the user to the Atlas UI: Atlas → Stream Processing → Workspace → Processor → Logs tab.
增量式管道开发(推荐): 查看references/development-workflow.md获取完整的5阶段生命周期。
  1. 从基础的
    $source
    $merge
    管道开始(验证连通性)
  2. 添加
    $match
    阶段(验证过滤逻辑)
  3. 添加
    $addFields
    /
    $project
    转换阶段(验证数据重塑)
  4. 添加窗口或数据增强阶段(验证聚合逻辑)
  5. 添加错误处理/DLQ配置
修改处理器管道:
  1. atlas-streams-manage
    action: "stop-processor"
    处理器必须先停止
  2. atlas-streams-manage
    action: "modify-processor"
    — 提供新管道
  3. atlas-streams-manage
    action: "start-processor"
    — 重启处理器
调试故障处理器:
  1. atlas-streams-discover
    diagnose-processor
    — 一键生成健康报告。请始终先执行此操作。
  2. 确定具体根本原因。将症状与诊断模式匹配:
    • 错误419 + "no partitions found" → Kafka主题不存在或拼写错误
    • 状态:FAILED + 多次重启 → 连接级错误(绕过DLQ),检查连接配置
    • 状态:STARTED + 无输出 + 窗口化管道 → 可能是空闲的Kafka分区阻止窗口关闭;在Kafka
      $source
      中添加
      partitionIdleTimeout
      (例如
      {"size": 30, "unit": "second"}
    • 状态:STARTED + 无输出 + 非窗口化管道 → 检查源是否有数据;查看Kafka偏移量延迟
    • memoryUsageBytes接近层级限制 → 存在内存不足风险;建议升级层级
    • DLQ计数增加 → 单文档错误;使用MongoDB
      find
      查询DLQ集合 查看references/output-diagnostics.md获取完整的模式表。
  3. 解释输出量之前,先对处理器类型进行分类(告警型、转换型、过滤型)。
  4. 根据诊断出的根本原因,提供具体、有序的修复步骤。请勿提供假设性场景列表。
  5. 如果需要详细日志,请引导用户前往Atlas UI:Atlas → Stream Processing → Workspace → Processor → Logs标签页

Chained processors (multi-sink pattern)

链式处理器(多接收器模式)

CRITICAL: A single pipeline can only have ONE terminal sink (
$merge
or
$emit
). When users request multiple output destinations (e.g., "write to Atlas AND emit to Kafka"), you MUST acknowledge the single-sink constraint and propose chained processors using an intermediate destination. See references/pipeline-patterns.md for the full pattern with examples.
重要提示:单个管道只能有一个终端接收器
$merge
$emit
)。当用户要求多个输出目标(例如“写入Atlas同时发送到Kafka”)时,必须告知用户单接收器限制,并建议使用中间目标构建链式处理器。查看references/pipeline-patterns.md获取完整模式和示例。

Pre-Deploy & Post-Deploy Checklists

部署前与部署后检查清单

See references/development-workflow.md for the complete pre-deploy quality checklist (connection validation, pipeline validation) and post-deploy verification workflow.
查看references/development-workflow.md获取完整的部署前质量检查清单(连接验证、管道验证)和部署后验证工作流。

Tier Sizing & Performance

层级规模与性能

See references/sizing-and-parallelism.md for tier specifications, parallelism formulas, complexity scoring, and performance optimization strategies.
查看references/sizing-and-parallelism.md获取层级规格、并行度公式、复杂度评分和性能优化策略。

Troubleshooting

故障排除

See references/development-workflow.md for the complete troubleshooting table covering processor failures, API errors, configuration issues, and performance problems.
查看references/development-workflow.md获取完整的故障排除表,涵盖处理器故障、API错误、配置问题和性能问题。

Billing & Cost

计费与成本

Atlas Stream Processing has no free tier. All deployed processors incur continuous charges while running.
  • Charges are per-hour, calculated per-second, only while the processor is running
  • stop-processor
    stops billing; stopped processors retain state for 45 days at no charge
  • For prototyping without billing: Use
    sp.process()
    in mongosh — runs pipelines ephemerally without deploying a processor
  • See
    references/sizing-and-parallelism.md
    for tier pricing and cost optimization strategies
Atlas Stream Processing无免费层级。所有已部署的处理器运行时会持续产生费用。
  • 按小时计费,按秒结算,仅在处理器运行时收费
  • stop-processor
    会停止计费;已停止的处理器会免费保留状态45天
  • 无计费原型开发: 在mongosh中使用
    sp.process()
    — 临时运行管道,无需部署处理器
  • 查看
    references/sizing-and-parallelism.md
    获取层级定价和成本优化策略

Safety Rules

安全规则

  • atlas-streams-teardown
    and
    atlas-streams-manage
    require user confirmation — do not bypass
  • BEFORE calling
    atlas-streams-teardown
    for a workspace
    , you MUST first inspect the workspace with
    atlas-streams-discover
    to count connections and processors, then present this information to the user before requesting confirmation
  • BEFORE creating any processor, you MUST validate all connections per the "Pre-Deployment Validation" section in references/development-workflow.md
  • Deleting a workspace removes ALL connections and processors permanently
  • After stopping a processor, state is preserved 45 days — then checkpoints are discarded
  • resumeFromCheckpoint: false
    drops all window state — warn user first
  • Moving processors between workspaces is not supported (must recreate)
  • Dry-run / simulation is not supported — explain what you would do and ask for confirmation
  • Always warn users about billing before starting processors
  • Store API authentication credentials in connection settings, never hardcode in processor pipelines
  • atlas-streams-teardown
    atlas-streams-manage
    需要用户确认 — 请勿跳过
  • 调用
    atlas-streams-teardown
    删除工作区前
    ,必须先使用
    atlas-streams-discover
    检查工作区,统计连接和处理器数量,然后将此信息告知用户并请求确认
  • 创建任何处理器前,必须根据references/development-workflow.md中的“部署前验证”部分验证所有连接
  • 删除工作区会永久移除所有连接和处理器
  • 停止处理器后,状态会保留45天 — 之后检查点会被丢弃
  • resumeFromCheckpoint: false
    会丢弃所有窗口状态 — 请先提醒用户
  • 不支持在工作区之间迁移处理器(必须重新创建)
  • 不支持试运行/模拟 — 请说明要执行的操作并请求用户确认
  • 启动处理器前,务必提醒用户计费规则
  • 将API认证凭证存储在连接设置中,切勿硬编码到处理器管道中

Reference Files

参考文件

FileRead when...
references/pipeline-patterns.md
Building or modifying processor pipelines
references/connection-configs.md
Creating connections (type-specific schemas)
references/development-workflow.md
Following lifecycle management or debugging decision trees
references/output-diagnostics.md
Processor output is unexpected (zero, low, or wrong)
references/sizing-and-parallelism.md
Choosing tiers, tuning parallelism, or optimizing cost
文件适用场景
references/pipeline-patterns.md
构建或修改处理器管道时
references/connection-configs.md
创建连接时(特定类型的schema)
references/development-workflow.md
遵循生命周期管理或调试决策树时
references/output-diagnostics.md
处理器输出异常(无输出、输出量低或输出错误)时
references/sizing-and-parallelism.md
选择层级、调整并行度或优化成本时