Beam – session window does not emit results as expected
I’m building an Apache Beam (v2.0) pipeline to run in Google Cloud Dataflow. The expected flow is:
- Event streams from Pub/Sub (unlimited data sources). They are simple JSON objects with
a sessionId
property (property). - Using custom DoFn to write events to
KV<String, String>
their key is that thesessionId
value is the entire JSON object. - Window events using the Session window (interval duration is 2 seconds in development and approximately 30 minutes in production).
- Now, only the results emitted from each window are printed
The pipeline code is as follows:
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(PubsubIO
.readStrings()
.fromSubscription(options.getSubscription()))
.apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() {
@ProcessElement
public void processElement(ProcessContext c) {
Gson gson = new Gson();
String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId");
KV<String, String> kv = KV.of(key, c.element());
c.output(kv);
}
}))
.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2))))
.apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println("****");
System.out.println(c.element());
System.out.println(c.timestamp());
}
}));
return pipeline.run();
I want the Window function to emit a result for each session (key-based) at the end of each session. I’m using a publish/subscribe emulator for testing purposes and just sending data randomly.
For example, if you send the following data to Pub/Subscribe:
{"sessionId": "2", "data": "data9583", "timestamp": 1507293681}
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683}
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684}
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684}
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685}
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686}
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687}
The Window function should emit the following:
- First window: Contains events with
sessionId=2
- The second window: contains 2 events
sessionId=3
- The third window: contains 3 events
sessionId=6
- 4th window: contains 1 event
sessionId=4
The idea here is:
- Windows only signals when a session “completes”, meaning
that {gapDuration}
has passed since the last event with that sessionId - Each window will contain events from a single session (since we have passed
KV<String, String>
to the Window function).
The above window function is directly Pulled from Beam documentation
What I actually see is:
- Each event is printed immediately at publish/subscribe time, so the pipeline doesn’t even wait for
{gapDuration}
. Launch window - Each print statement contains one event
It’s worth noting that if I add a custom CombineFn (which just converts JSON objects to an array of JSON objects), nothing goes into CombineFn
. Also not to PrintFn (I added print statements to
CombineFn
).
I’m assuming the trigger has something to do with this, but can’t seem to find anything useful to get me moving in the right direction (Beam’s sample code is surprisingly small, especially for v2.1. 0.
My question:
- Is the behavior I want possible?
- If so, what am I missing? Is this approach at least on the right track?
- It would be great if someone could point me to a good source of sample code for various Beam pipeline use cases!
Resources I’ve searched for without success :
- Beyond the World of Batching: Streaming 101 & 102
- “Complete” Examples from Beam Github
- Beam JavaDoc
Solution
First, window functions that need to merge elements between windows need to apply aggregation operations, such as GroupByKey or Combine. This is discussed in the Beam Programming Guide under Windowing Basics
Second, by default, PubSub (as you use) will assign a timestamp to an element based on when it was published. Since you have an explicit timestamp field, you may want to investigate publishing these elements using the timestamp attribute and using >withTimestampAttribute method Read them. This will use the timestamp property you published as the timestamp.