- Updated: January 7, 2026
- 7 min read
Unified Apache Beam Pipeline Brings Seamless Batch and Stream Processing with Event‑Time Windowing
A unified Apache Beam pipeline that runs on the DirectRunner can process both batch and streaming data using event‑time windowing, delivering identical aggregation semantics whether the source is bounded or unbounded.
Why This Apache Beam Tutorial Matters
Data engineers and developers constantly wrestle with two separate codebases: one for batch jobs and another for real‑time streams. Apache Beam’s unified programming model eliminates that friction, letting you write a single pipeline that works in both worlds. The tutorial we explore below expands on the original article and adds practical insights, code snippets, and real‑world use‑cases.
Apache Beam’s Unified Model Explained
Apache Beam defines a portable API that abstracts away the underlying execution engine (runner). Whether you run on Google Cloud Dataflow, Apache Flink, Spark, or the lightweight UBOS platform overview, the same pipeline code applies. This “write once, run anywhere” philosophy is the cornerstone of modern data engineering, reducing maintenance overhead and enabling rapid experimentation.
- Unified
PCollectionabstraction for bounded and unbounded data. - Transformations (
ParDo,GroupByKey,Combine) work identically across modes. - Windowing and triggers give fine‑grained control over when results are emitted.
By leveraging this model, teams can focus on business logic rather than plumbing. For SaaS products built on Enterprise AI platform by UBOS, this means faster feature rollouts and consistent analytics pipelines.
Batch Processing vs. Stream Processing
Batch processing works on a fixed dataset that already exists. It is ideal for nightly reports, ETL jobs, and large‑scale data migrations. In contrast, stream processing ingests data continuously, handling events as they arrive, which is essential for fraud detection, real‑time dashboards, and IoT telemetry.
Beam’s PipelineOptions let you toggle between these modes with a single flag. The tutorial demonstrates this by switching the input source from a static beam.Create (batch) to a TestStream (stream) while reusing the same windowing logic.
For organizations using UBOS solutions for SMBs, the ability to prototype a streaming feature on a local DirectRunner and later migrate to a production runner saves both time and cost.
Event‑Time Windowing and Triggers in Depth
Event‑time windowing groups records based on the timestamp embedded in the data, not on when the system processes them. This is crucial when events can arrive out of order or be delayed. Beam supports several window types; the tutorial uses FixedWindows of 60 seconds.
Triggers decide when a window’s results are emitted. The example combines:
AfterWatermark– fires when the watermark passes the end of the window.AfterProcessingTime– early and late firings after a short delay (10 seconds in the demo).AccumulationMode.ACCUMULATING– retains state so late data can update prior results.
By configuring allowed_lateness=120 seconds, the pipeline tolerates events that arrive up to two minutes late, a common scenario in distributed systems.
The same windowing strategy can be reused in a production environment powered by Workflow automation studio, enabling real‑time KPI updates without rewriting logic.
Step‑by‑Step Code Walkthrough
1. Setting Up Pipeline Options
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DirectRunner'
# Toggle streaming mode
options.view_as(StandardOptions).streaming = (MODE == 'stream')
The StandardOptions object tells Beam to use the DirectRunner, which runs locally and is perfect for development and testing. Switching MODE from 'batch' to 'stream' flips the pipeline behavior without any code changes.
2. Generating Event‑Time‑Aware Synthetic Data
import datetime, json
def make_event(user_id, event_type, amount, ts):
return {
"user_id": user_id,
"event_type": event_type,
"amount": float(amount),
"event_time": int(ts)
}
base = datetime.datetime.now(datetime.timezone.utc).replace(microsecond=0)
t0 = int(base.timestamp())
BATCH_EVENTS = [
make_event("u1", "purchase", 20, t0 + 5),
make_event("u1", "purchase", 15, t0 + 20),
make_event("u2", "purchase", 8, t0 + 35),
# Late and out‑of‑order events
make_event("u1", "refund", -5, t0 + 62),
make_event("u2", "purchase", 12, t0 + 70),
make_event("u3", "purchase", 9, t0 + 75),
make_event("u2", "purchase", 3, t0 + 50),
]
Each event carries an event_time field, enabling deterministic windowing. The dataset deliberately includes out‑of‑order and late events to showcase Beam’s handling of real‑world data streams.
3. Reusable Windowed Aggregation Transform
import apache_beam as beam
from apache_beam.transforms.window import FixedWindows
from apache_beam.transforms.trigger import AfterWatermark, AfterProcessingTime, AccumulationMode
class WindowedUserAgg(beam.PTransform):
def expand(self, pcoll):
stamped = pcoll | beam.Map(
lambda e: beam.window.TimestampedValue(e, e["event_time"])
)
windowed = stamped | beam.WindowInto(
FixedWindows(WINDOW_SIZE_SECS),
allowed_lateness=ALLOWED_LATENESS_SECS,
trigger=AfterWatermark(
early=AfterProcessingTime(10),
late=AfterProcessingTime(10)
),
accumulation_mode=AccumulationMode.ACCUMULATING,
)
keyed = windowed | beam.Map(lambda e: (e["user_id"], e["amount"]))
counts = keyed | beam.combiners.Count.PerKey()
sums = keyed | beam.CombinePerKey(sum)
return ({
"count": counts,
"sum_amount": sums
} | beam.CoGroupByKey()
| beam.Map(lambda kv: {
"user_id": kv[0],
"count": int(kv[1]["count"][0]) if kv[1]["count"] else 0,
"sum_amount": float(kv[1]["sum_amount"][0]) if kv[1]["sum_amount"] else 0.0,
})
)
This PTransform encapsulates all windowing, triggering, and aggregation logic. Because it operates on a PCollection of generic dictionaries, it can be attached to either a beam.Create (batch) or a TestStream (stream) source.
4. Adding Human‑Readable Window Information
class AddWindowInfo(beam.DoFn):
def process(self, element, window=beam.DoFn.WindowParam, pane_info=beam.DoFn.PaneInfoParam):
ws = float(window.start)
we = float(window.end)
yield {
**element,
"window_start_utc": datetime.datetime.fromtimestamp(ws, tz=datetime.timezone.utc).strftime("%H:%M:%S"),
"window_end_utc": datetime.datetime.fromtimestamp(we, tz=datetime.timezone.utc).strftime("%H:%M:%S"),
"pane_timing": str(pane_info.timing),
"pane_is_first": pane_info.is_first,
"pane_is_last": pane_info.is_last,
}
By exposing window boundaries and pane metadata, developers can debug timing issues and verify that late data updates are applied as expected.
5. Executing the Pipeline in Both Modes
def run_batch():
with beam.Pipeline(options=PipelineOptions()) as p:
(p
| "CreateBatch" >> beam.Create(BATCH_EVENTS)
| "Aggregate" >> WindowedUserAgg()
| "AddInfo" >> beam.ParDo(AddWindowInfo())
| "Print" >> beam.Map(print))
def run_stream():
opts = PipelineOptions()
opts.view_as(StandardOptions).streaming = True
with beam.Pipeline(options=opts) as p:
(p
| "TestStream" >> build_test_stream()
| "Aggregate" >> WindowedUserAgg()
| "AddInfo" >> beam.ParDo(AddWindowInfo())
| "Print" >> beam.Map(print))
run_stream() if MODE == "stream" else run_batch()
The only difference is the source: beam.Create for batch or TestStream for streaming. All downstream transforms remain untouched, proving the power of Beam’s unified model.
Benefits and Real‑World Use‑Cases
Deploying a single Beam pipeline across batch and streaming workloads yields tangible advantages:
- Reduced Code Duplication: One codebase, two execution modes.
- Consistent Business Logic: Guarantees that nightly reports and live dashboards compute identical metrics.
- Lower Operational Costs: Developers spend less time maintaining parallel pipelines.
- Scalable to Production: Start locally with DirectRunner, then switch to Dataflow, Flink, or Spark without changes.
Typical Scenarios
- E‑commerce Analytics: Real‑time purchase funnels combined with nightly revenue reconciliation.
- IoT Telemetry: Stream sensor data for anomaly detection while also generating daily health reports.
- Financial Services: Process trade events in real time for risk monitoring and produce end‑of‑day compliance reports.
Companies building AI‑enhanced products can embed these pipelines into the AI marketing agents offered by UBOS, enabling marketers to react instantly to campaign performance while preserving historical analysis.
For startups, the UBOS for startups program provides pre‑configured Beam templates (e.g., AI SEO Analyzer) that can be extended with custom windowing logic, accelerating time‑to‑value.
Next Steps: Bring This Pipeline Into Your Stack
Ready to experiment? Start with the UBOS homepage to spin up a free development environment. Use the Web app editor on UBOS to edit the Python code directly in the browser, then run it with the built‑in DirectRunner.
When you’re comfortable, explore the UBOS pricing plans to select a runner that matches your production needs—whether it’s a managed Dataflow service or a self‑hosted Flink cluster.
Need inspiration? Browse the UBOS portfolio examples for real‑world implementations of streaming analytics, and check out the UBOS templates for quick start such as the AI Article Copywriter or the AI Video Generator to see how Beam can feed downstream AI services.
Finally, consider joining the UBOS partner program to get dedicated support, co‑marketing opportunities, and early access to new connectors such as the ChatGPT and Telegram integration or the Chroma DB integration.
Dive in today, and let a single Apache Beam pipeline power both your batch workloads and real‑time analytics—without rewriting a line of code.