Loading...
Loading...
Explains core Apache Beam programming model concepts including PCollections, PTransforms, Pipelines, and Runners. Use when learning Beam fundamentals or explaining pipeline concepts.
npx skill4agent add apache/beam beam-concepts// Java
Pipeline p = Pipeline.create(options);
p.apply(...)
.apply(...)
.apply(...);
p.run().waitUntilFinish();# Python
with beam.Pipeline(options=options) as p:
(p | 'Read' >> beam.io.ReadFromText('input.txt')
| 'Transform' >> beam.Map(process)
| 'Write' >> beam.io.WriteToText('output'))// Java
PCollection<String> output = input.apply(MyTransform.create());# Python
output = input | 'Name' >> beam.ParDo(MyDoFn())// Java
input.apply(ParDo.of(new DoFn<String, Integer>() {
@ProcessElement
public void processElement(@Element String element, OutputReceiver<Integer> out) {
out.output(element.length());
}
}));# Python
class LengthFn(beam.DoFn):
def process(self, element):
yield len(element)
input | beam.ParDo(LengthFn())
# Or simpler:
input | beam.Map(len)PCollection<KV<String, Integer>> input = ...;
PCollection<KV<String, Iterable<Integer>>> grouped = input.apply(GroupByKey.create());// Global combine
input.apply(Combine.globally(Sum.ofIntegers()));
// Per-key combine
input.apply(Combine.perKey(Sum.ofIntegers()));PCollectionList<String> collections = PCollectionList.of(pc1).and(pc2).and(pc3);
PCollection<String> merged = collections.apply(Flatten.pCollections());input.apply(Window.into(FixedWindows.of(Duration.standardMinutes(5))));input | beam.WindowInto(beam.window.FixedWindows(300))input.apply(Window.<T>into(FixedWindows.of(Duration.standardMinutes(5)))
.triggering(AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane()
.plusDelayOf(Duration.standardMinutes(1))))
.withAllowedLateness(Duration.standardHours(1))
.accumulatingFiredPanes());PCollectionView<Map<String, String>> sideInput =
lookupTable.apply(View.asMap());
mainInput.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Map<String, String> lookup = c.sideInput(sideInput);
// Use lookup...
}
}).withSideInputs(sideInput));public interface MyOptions extends PipelineOptions {
@Description("Input file")
@Required
String getInput();
void setInput(String value);
}
MyOptions options = PipelineOptionsFactory.fromArgs(args).as(MyOptions.class);@DefaultSchema(AutoValueSchema.class)
@AutoValue
public abstract class User {
public abstract String getName();
public abstract int getAge();
}
PCollection<User> users = ...;
PCollection<Row> rows = users.apply(Convert.toRows());TupleTag<String> successTag = new TupleTag<>() {};
TupleTag<String> failureTag = new TupleTag<>() {};
PCollectionTuple results = input.apply(ParDo.of(new DoFn<String, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
c.output(failureTag, c.element());
}
}
}).withOutputTags(successTag, TupleTagList.of(failureTag)));
results.get(successTag).apply(WriteToSuccess());
results.get(failureTag).apply(WriteToDeadLetter());# Use Java Kafka connector from Python
from apache_beam.io.kafka import ReadFromKafka
result = pipeline | ReadFromKafka(
consumer_config={'bootstrap.servers': 'localhost:9092'},
topics=['my-topic']
)