Java – After processing, pass elements back into the input stream in Flink?

After processing, pass elements back into the input stream in Flink?… here is a solution to the problem.

After processing, pass elements back into the input stream in Flink?

Scenario:

I have a stream of events from a sensor. Events can be T-shaped or J-shaped.

  • T-shaped events have the timestamp at which the event occurred.
  • Class J events have start and end timestamps.

Based on the start and end timestamps of J-Type events, aggregate logic is applied to all T-type events in the time range and the results are written to DB.

To do this, I created a custom trigger that fires when a J-Type event is received. In my custom ProcessWindowFunction, I’m performing aggregation logic and time checking.

However, there may be a situation where a class T event is not within the time range of the current class J event.
In this case, the T-shaped event should be pushed to the next window before clearing the current window.

Stream Window

Solution ideas:

  1. Push unhandled T-shaped events into the Kinesis stream (source) in a custom window handler. (worst-case solution).

  2. Use FIRE instead of FIRE_AND_PURGE to maintain state throughout the runtime. Use element iterators to delete processed elements. (Not recommended, keep unlimited windows)

Wondering, is there any way to push unhandled events directly back into the input stream (without movement). (Requeue).

Or

Is there any way to maintain state in the keyBy context so that we perform calculations on this unprocessed data (before or with) with the window elements.

Solution

There are two solutions here. Their basic behavior is more or less the same, but you may find one of them easier to understand, maintain, or test.

As for your question, no, there is no way to loop back (requeue) unused events without pushing them back to Kinesis. But just hold on to them until you need them.

Solution 1: Use the RichFlatMapFunction

When T-class events arrive, attach them to the ListState object. When Class J events arrive, collect all matching Class T events from the list to the output, and update the list to keep only those Class T events that will belong to later Class J events.

Solution 2: Use GlobalWindows with custom triggers and depreciators

In addition to what you’ve already done, implement an Evictor that (after the window is FIREd) removes only J-type events and all matching T-type events from the window.

Update: Clear the status of stale key/failed sensors

For Solution 1, you can use it state TTL Schedule the removal of any non-active states related to dead keys. Or you can use KeyedProcessFunction instead of RichFlatMapFunction and use a timer to do the same thing.

Using the Window API to manage the state of stale keys may not be straightforward, but for Solution 2, I believe you can extend the custom trigger to include a timeout that will clear the window. If you use global state in ProcessWindowFunction, you need to rely on state TTL to clean it up.

Related Problems and Solutions