Java – Kafka streaming issues in time window aggregation

Kafka streaming issues in time window aggregation… here is a solution to the problem.

Kafka streaming issues in time window aggregation

I have questions about KStreams aggregation and windowing. I want to aggregate a record into a list of records that have the same key as long as it falls within a time window.
I chose SessionWindows because I had to use a mobile window in a session: suppose record A arrives at 10:00:00; Then all other records with the same key arrive
During the 10-second window (until 10:00:10) it will fall into the same session, remember that if it arrives at 10:00:03, the window will move to 10:00:13 (+10 seconds).

This causes us to have a movement window of +10 seconds from the last record received for a given key.

Now the question is: I want to get the final aggregate result. I use .suppress() to indicate that I don’t want any intermediate results, I just want the last result when the window is closed. This one
Doesn’t work because while it doesn’t send any intermediate aggregate results, when the time window ends, I don’t get any results. I noticed that in order to receive it, I need to post another one
The message goes to the topic, which is not possible in my case.

Reading .suppress() I came to the conclusion that it might not be the way to implement what I wanted, which is why my question is: how do I force the window to close and send the latest aggregate calculation result?

@StreamListener(ExtractContractBinding.RECEIVE_PAGE)
@SendTo(ExtractCommunicationBinding.AGGREGATED_PAGES) 
public KStream<String, List<Records>> aggregatePages(KStream<?, Record> input) { 
    input.map(this::getRecord)
            .groupBy(keyOfElement)
            .windowedBy(SessionWindows.with(Duration.ofSeconds(10L)).grace(Duration.ofSeconds(10L)))
            .aggregate(... do stuff...)
            .suppress(Suppressed.untilWindowCloses(unbounded()))
            .toStream()
            .map(this::createAggregatedResult);
}

Solution

In short, the reason this happens is because in KStreams and most other stream processing engines that compute aggregations, time works based on event time.

https://kafka.apache.org/0101/documentation/streams#streams_time

In other words, the window can’t be closed until a new message arrives beyond your time window + grace time (causing the message to arrive late).

Also, based on some unit tests I’ve written recently, I’m inclined to think that the second message needs to be in the same partition as the previous one in order for the event time to move forward. In practice, this becomes less obvious when you are running in a production environment and presumably processing hundreds of messages per second.

I would also add that you can implement a custom timestamp extractor, which allows you fine-grained control based on the time window in which a particular message arrives.

how can I force the window to close and send the latest aggregated calculated result?

To finally answer your question, it is not possible to force a closing time window without sending an additional message to the source topic.

Related Problems and Solutions