Java – React removes objects already in the database from the stream

React removes objects already in the database from the stream… here is a solution to the problem.

React removes objects already in the database from the stream

I’m scanning a file in a directory and then processing the results. Before further processing, I want to delete files already in the data store from the scan results.

Try using Reactive mongoDB to do this in a reactive fashion. I’m just not sure how to implement filters in a way that uses database query results.

@Override
public Flux<File> findFiles(Directory directory) {

 Only get these file types as we can't process anything else
    final Predicate<Path> extensions = path ->
            path.toString().endsWith(".txt") ||
                    path.toString().endsWith(".doc") ||
                    path.toString().endsWith(".pdf");

final Set<File> files = fileService.findAll(Paths.get(directory.getPath()), extensions);

final Stream<Video> fileStream = files
            .stream()
            .map(this::convertFileToDocument)

 This is wrong (doesn't compile for a start), but how do I do something similar or of this nature? 
            .filter(file -> fileRepository.findById(file.getId()));

return Flux.fromStream(fileStream);
}

convertFileToDocument just maps files to POJOs, nothing interesting happens.

How to add a filter based on the results of findById, or is there a better way to achieve this?

Solution

If fileRepository.findById returns a Mono, I suggest you convert the stream to flux and then filter using filterWhen; Check Mono for elements. Kind of like

final Stream<Video> fileStream = files
        .stream()
        .map(this::convertFileToDocument);
return Flux.fromStream(fileStream).filterWhen(file -> fileRepository.findById(file.getId()).hasElement().map(b -> !b));

This filters out all files that return non-empty Mono for findById or exist in the database. Please let me know if I misunderstand something.

Related Problems and Solutions