React removes objects already in the database from the stream
I’m scanning a file in a directory and then processing the results. Before further processing, I want to delete files already in the data store from the scan results.
Try using Reactive mongoDB to do this in a reactive fashion. I’m just not sure how to implement filters in a way that uses database query results.
@Override
public Flux<File> findFiles(Directory directory) {
Only get these file types as we can't process anything else
final Predicate<Path> extensions = path ->
path.toString().endsWith(".txt") ||
path.toString().endsWith(".doc") ||
path.toString().endsWith(".pdf");
final Set<File> files = fileService.findAll(Paths.get(directory.getPath()), extensions);
final Stream<Video> fileStream = files
.stream()
.map(this::convertFileToDocument)
This is wrong (doesn't compile for a start), but how do I do something similar or of this nature?
.filter(file -> fileRepository.findById(file.getId()));
return Flux.fromStream(fileStream);
}
convertFileToDocument
just maps files to POJOs, nothing interesting happens.
How to add a filter based on the results of findById
, or is there a better way to achieve this?
Solution
If fileRepository.findById
returns a Mono, I suggest you convert the stream to flux and then filter using filterWhen
; Check Mono for elements. Kind of like
final Stream<Video> fileStream = files
.stream()
.map(this::convertFileToDocument);
return Flux.fromStream(fileStream).filterWhen(file -> fileRepository.findById(file.getId()).hasElement().map(b -> !b));
This filters out all files that return non-empty Mono for findById
or exist in the database. Please let me know if I misunderstand something.