Java – Notifies when TextIO finishes writing to a file

Notifies when TextIO finishes writing to a file… here is a solution to the problem.

Notifies when TextIO finishes writing to a file

I’m working on a scenario that uses two pipelines from DataFlow in Google Cloud:

Pipe A runs in streaming mode, continuously creating files in Google Storage based on hourly windows and some shards, as follows:

data.apply(TextIO.write().to(resource.getCurrentDirectory())
            .withFilenamePolicy(new PerWindowFiles(prefix))
            .withWindowedWrites()
            .withNumShards(42));

Pipeline B works in batch mode, periodically loading these files for further processing, such as once an hour.

The question arises: What files can pipeline B safely load from GS?

  • All this -> may not be a good idea, in case A does not finish writing some of them and we will get corrupted files.

  • Time-based (e.g. only loading files that are at least 2 hours old)-> If A is late, it can also cause problems

  • A

  • way to create a “complete” flag in A, which tells B which files are complete.

  • Somehow get notified when the final pane processing of the window is complete > haven’t found a way to do this yet.

I want a third method, but can’t find a way to determine when TextIO actually finishes writing to a file without waiting for the pipeline to finish.

the author of TextIO |does not return another one PCollection。 One way is to override FileBasedSink.WriteOperationfinalize method。 It is created somewhere inside the TextIO and requires copying the entire class and eventually building a custom sink. In my opinion, this is a bit of overkill.

Anyone with an idea or experience of a simpler solution How to achieve this?

Solution

TextIO.write() writes data to temporary files and then automatically renames each successfully written temporary file to its final location. You can safely use files that match your “prefix” in pipe B, because temporary files will be named in a way that does not match the prefix (we explicitly considered your use case when deciding how to name temporary files), so all files that pipe B sees will be complete.

Alternatively, we are a version of about to add (link to pull request) TextIO.read(), which continuously ingests new files in streaming mode; When you’re ready, you can use it in pipeline B. See also http://s.apache.org/textio-sdf and linked JIRA.

Related Problems and Solutions