beam-concepts
Compare original and translation side by side
🇺🇸
Original
English🇨🇳
Translation
ChineseApache Beam Core Concepts
Apache Beam核心概念
The Beam Model
Beam模型
Evolved from Google's MapReduce, FlumeJava, and Millwheel projects. Originally called the "Dataflow Model."
源自Google的MapReduce、FlumeJava和Millwheel项目,最初被称为“Dataflow Model”。
Key Abstractions
核心抽象概念
Pipeline
Pipeline
A Pipeline encapsulates the entire data processing task, including reading, transforming, and writing data.
java
// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
.apply(...)
.apply(...);
p.run().waitUntilFinish();python
undefinedPipeline封装了整个数据处理任务,包括数据的读取、转换和写入。
java
// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
.apply(...)
.apply(...);
p.run().waitUntilFinish();python
undefinedPython
Python
with beam.Pipeline(options=options) as p:
(p | 'Read' >> beam.io.ReadFromText('input.txt')
| 'Transform' >> beam.Map(process)
| 'Write' >> beam.io.WriteToText('output'))
undefinedwith beam.Pipeline(options=options) as p:
(p | 'Read' >> beam.io.ReadFromText('input.txt')
| 'Transform' >> beam.Map(process)
| 'Write' >> beam.io.WriteToText('output'))
undefinedPCollection
PCollection
A distributed dataset that can be bounded (batch) or unbounded (streaming).
一种分布式数据集,可以是有界(批处理)或无界(流处理)的。
Properties
特性
- Immutable - Once created, cannot be modified
- Distributed - Elements processed in parallel
- May be bounded or unbounded
- Timestamped - Each element has an event timestamp
- Windowed - Elements assigned to windows
- 不可变:一旦创建,无法修改
- 分布式:元素并行处理
- 可为有界或无界
- 带时间戳:每个元素都有事件时间戳
- 窗口化:元素被分配到不同窗口
PTransform
PTransform
A data processing operation that transforms PCollections.
java
// Java
PCollection<String> output = input.apply(MyTransform.create());python
undefined一种数据处理操作,用于转换PCollections。
java
// Java
PCollection<String> output = input.apply(MyTransform.create());python
undefinedPython
Python
output = input | 'Name' >> beam.ParDo(MyDoFn())
undefinedoutput = input | 'Name' >> beam.ParDo(MyDoFn())
undefinedCore Transforms
核心转换操作
ParDo
ParDo
General-purpose parallel processing.
java
// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<Integer> out) {
out.output(element.length());
}
}));python
undefined通用并行处理。
java
// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<Integer> out) {
out.output(element.length());
}
}));python
undefinedPython
Python
class LengthFn(beam.DoFn):
def process(self, element):
yield len(element)
input | beam.ParDo(LengthFn())
class LengthFn(beam.DoFn):
def process(self, element):
yield len(element)
input | beam.ParDo(LengthFn())
Or simpler:
或者更简洁的写法:
input | beam.Map(len)
undefinedinput | beam.Map(len)
undefinedGroupByKey
GroupByKey
Groups elements by key.
java
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());按Key分组元素。
java
PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());CoGroupByKey
CoGroupByKey
Joins multiple PCollections by key.
按Key连接多个PCollections。
Combine
Combine
Combines elements (sum, mean, etc.).
java
// Global combine
input.apply(Combine.globally(Sum.ofIntegers()));
// Per-key combine
input.apply(Combine.perKey(Sum.ofIntegers()));合并元素(求和、求均值等)。
java
// 全局合并
input.apply(Combine.globally(Sum.ofIntegers()));
// 按Key合并
input.apply(Combine.perKey(Sum.ofIntegers()));Flatten
Flatten
Merges multiple PCollections.
java
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());合并多个PCollections。
java
PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());Partition
Partition
Splits a PCollection into multiple PCollections.
将一个PCollection拆分为多个PCollections。
Windowing
窗口机制
Types
窗口类型
- Fixed Windows - Regular, non-overlapping intervals
- Sliding Windows - Overlapping intervals
- Session Windows - Gaps of inactivity define boundaries
- Global Window - All elements in one window (default)
java
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));python
input | beam.WindowInto(beam.window.FixedWindows(300))- 固定窗口:规则的、不重叠的时间间隔
- 滑动窗口:重叠的时间间隔
- 会话窗口:以无活动间隔定义窗口边界
- 全局窗口:所有元素在同一个窗口中(默认)
java
input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));python
input | beam.WindowInto(beam.window.FixedWindows(300))Triggers
触发器
Control when results are emitted.
java
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.standardHours(1))
.accumulatingFiredPanes());控制结果的输出时机。
java
input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.standardHours(1))
.accumulatingFiredPanes());Side Inputs
侧输入
Additional inputs to ParDo.
java
PCollectionView<Map<String, String>> sideInput =
lookupTable.apply(View.asMap());
mainInput.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> lookup = c.sideInput(sideInput);
// Use lookup...
}
}).withSideInputs(sideInput));ParDo的额外输入。
java
PCollectionView<Map<String, String>> sideInput =
lookupTable.apply(View.asMap());
mainInput.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> lookup = c.sideInput(sideInput);
// 使用lookup进行处理...
}
}).withSideInputs(sideInput));Pipeline Options
管道配置选项
Configure pipeline execution.
java
public interface MyOptions extends PipelineOptions {
@Description("Input file")
@Required
String getInput();
void setInput(String value);
}
MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);配置管道的执行参数。
java
public interface MyOptions extends PipelineOptions {
@Description("输入文件路径")
@Required
String getInput();
void setInput(String value);
}
MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);Schema
数据Schema
Strongly-typed access to structured data.
java
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
public abstract String getName();
public abstract int getAge();
}
PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());对结构化数据的强类型访问。
java
@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
public abstract String getName();
public abstract int getAge();
}
PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());Error Handling
错误处理
Dead Letter Queue Pattern
死信队列模式
java
TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};
PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
c.output(failureTag, c.element());
}
}
}).withOutputTags(successTag, TupleTagList.of(failureTag)));
results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());java
TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};
PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
c.output(failureTag, c.element());
}
}
}).withOutputTags(successTag, TupleTagList.of(failureTag)));
results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());Cross-Language Pipelines
跨语言管道
Use transforms from other SDKs.
python
undefined使用其他SDK中的转换操作。
python
undefinedUse Java Kafka connector from Python
从Python中使用Java Kafka连接器
from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:9092'},
topics=['my-topic']
)
undefinedfrom apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:9092'},
topics=['my-topic']
)
undefinedBest Practices
最佳实践
- Prefer built-in transforms over custom DoFns
- Use schemas for type-safe operations
- Minimize side inputs for performance
- Handle late data explicitly
- Test with DirectRunner before deploying
- Use TestPipeline for unit tests
- 优先使用内置转换操作而非自定义DoFn
- 使用Schema实现类型安全的操作
- 减少侧输入以提升性能
- 显式处理延迟数据
- 部署前使用DirectRunner测试
- 使用TestPipeline进行单元测试