The Java – Beam – session window does not emit results as expected

Beam – session window does not emit results as expected… here is a solution to the problem.

Beam – session window does not emit results as expected

I’m building an Apache Beam (v2.0) pipeline to run in Google Cloud Dataflow. The expected flow is:

  • Event streams from Pub/Sub (unlimited data sources). They are simple JSON objects with a sessionId property (property).
  • Using custom DoFn to write events to KV<String, String> their key is that the sessionId value is the entire JSON object.
  • Window events using the Session window (interval duration is 2 seconds in development and approximately 30 minutes in production).
  • Now, only the results emitted from each window are printed

The pipeline code is as follows:

   Pipeline pipeline = Pipeline.create(options);

pipeline.apply(PubsubIO
                    .readStrings()
                    .fromSubscription(options.getSubscription()))

.apply("AddKeyFn", ParDo.of(new DoFn<String, KV<String, String>>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                Gson gson = new Gson();
                String key = (String) gson.fromJson(c.element(), HashMap.class).get("sessionId");
                KV<String, String> kv = KV.of(key, c.element());
                c.output(kv);
            }
          }))

.apply(Window.<KV<String, String>>into(Sessions.withGapDuration(Duration.standardSeconds(2))))

.apply("PrintFn", ParDo.of(new DoFn<KV<String, String>, Void>() {
            @ProcessElement
            public void processElement(ProcessContext c) {
                System.out.println("****");
                System.out.println(c.element());
                System.out.println(c.timestamp());
            }
          }));

return pipeline.run();

I want the Window function to emit a result for each session (key-based) at the end of each session. I’m using a publish/subscribe emulator for testing purposes and just sending data randomly.

For example, if you send the following data to Pub/Subscribe:

{"sessionId": "2", "data": "data9583", "timestamp": 1507293681}
{"sessionId": "3", "data": "data5220", "timestamp": 1507293683}
{"sessionId": "6", "data": "data2998", "timestamp": 1507293684}
{"sessionId": "3", "data": "data3820", "timestamp": 1507293684}
{"sessionId": "6", "data": "data5728", "timestamp": 1507293685}
{"sessionId": "6", "data": "data7173", "timestamp": 1507293686}
{"sessionId": "4", "data": "data8800", "timestamp": 1507293687}

The Window function should emit the following:

  • First window: Contains events with sessionId=2
  • The second window: contains 2 events sessionId=3
  • The third window: contains 3 events sessionId=6
  • 4th window: contains 1 event sessionId=4

The idea here is:

  • Windows only signals when a session “completes”, meaning that {gapDuration} has passed since the last event with that sessionId
  • Each window will contain events from a single session (since we have passed KV<String, String> to the Window function).

The above window function is directly Pulled from Beam documentation

What I actually see is:

  • Each event is printed immediately at publish/subscribe time, so the pipeline doesn’t even wait for {gapDuration}. Launch window
  • Each print statement contains one event

It’s worth noting that if I add a custom CombineFn (which just converts JSON objects to an array of JSON objects), nothing goes into CombineFn. Also not to PrintFn (I added print statements to CombineFn).

I’m assuming the trigger has something to do with this, but can’t seem to find anything useful to get me moving in the right direction (Beam’s sample code is surprisingly small, especially for v2.1. 0.

My question:

  • Is the behavior I want possible?
  • If so, what am I missing? Is this approach at least on the right track?
  • It would be great if someone could point me to a good source of sample code for various Beam pipeline use cases!

Resources I’ve searched for without success :

Solution

First, window functions that need to merge elements between windows need to apply aggregation operations, such as GroupByKey or Combine. This is discussed in the Beam Programming Guide under Windowing Basics

Second, by default, PubSub (as you use) will assign a timestamp to an element based on when it was published. Since you have an explicit timestamp field, you may want to investigate publishing these elements using the timestamp attribute and using >withTimestampAttribute method Read them. This will use the timestamp property you published as the timestamp.

Related Problems and Solutions