Java – Filter input files using globStatus in MapReduce

Filter input files using globStatus in MapReduce… here is a solution to the problem.

Filter input files using globStatus in MapReduce

I

have a lot of input files and I want to process the selected files based on the last attached date. I’m now confused about where to filter files using the globStatus method.

I

have a custom RecordReader class and I tried globStatus in its next method, but without success.

public boolean next(Text key, Text value) throws IOException {
    Path filePath = fileSplit.getPath();

if (!processed) {
        key.set(filePath.getName());

byte[] contents = new byte[(int) fileSplit.getLength()];
        value.clear();
        FileSystem fs = filePath.getFileSystem(conf);
        fs.globStatus(new Path("/*" + date));
        FSDataInputStream in = null;

try {
            in = fs.open(filePath);
            IOUtils.readFully(in, contents, 0, contents.length);
            value.set(contents, 0, contents.length);
        } finally {
            IOUtils.closeStream(in);
        }
        processed = true;
        return true;
    }
    return false;
}

I

know it returns a FileStatus array, but how do I use it to filter files. Can someone explain?

Solution

The globStatus method takes 2 free parameters that allow you to filter files. The first is glob mode, but sometimes glob mode is not enough to filter a specific file, in which case you can define a PathFilter.

Regarding glob mode, the following is supported:

Glob   | Matches
-------------------------------------------------------------------------------------------------------------------
*      | Matches zero or more characters
?      | Matches a single character
[ab]   | Matches a single character in the set {a, b}
[^ab]  | Matches a single character not in the set {a, b}
[a-b]  | Matches a single character in the range [a, b] where a is lexicographically less than or equal to b
[^a-b] | Matches a single character not in the range [a, b] where a is lexicographically less than or equal to b
{a,b}  | Matches either expression a or b
\c     | Matches character c when it is a metacharacter

PathFilter is just an interface like this:

public interface PathFilter {
    boolean accept(Path path);
}

So you can implement

this interface and implement the accept method, and you can put your logic into the filter file.

An example taken from Tom White’s excellent book, which allows you to define a PathFilter to filter files that match a specific regular expression:

public class RegexExcludePathFilter implements PathFilter {
    private final String regex;

public RegexExcludePathFilter(String regex) {
        this.regex = regex;
    }

public boolean accept(Path path) {
        return !path.toString().matches(regex);
    }
}

You can use the PathFilter to filter your input directly by calling FileInputFormat.setInputPathFilter (JobConf, RegexExcludePathFilter.class) when initializing the job.

Edit: Because you must pass the class in setInputPathFilter, you cannot pass parameters directly, but you should be able to use the configuration. If you make the RegexCludePathFilter also extend from Configured, you can get back a Configuration object that you previously initialized with the required values, so you can get those values back in the filter and process them in accept.

For example, if you initialize like this:

conf.set("date", "2013-01-15");

Then you can define your filter like this:

public class RegexIncludePathFilter extends Configured implements PathFilter {
    private String date;
    private FileSystem fs;

public boolean accept(Path path) {
        try {
            if (fs.isDirectory(path)) {
                return true;
            }
        } catch (IOException e) {}
        return path.toString().endsWith(date);
    }

public void setConf(Configuration conf) {
        if (null != conf) {
            this.date = conf.get("date");
            try {
                this.fs = FileSystem.get(conf);
            } catch (IOException e) {}
        }
    }
}

Edit 2: There are some problems with the original code, please check out the updated class. You also need to remove the constructor because it is no longer in use and check if it is a directory, in which case you should return true so that the contents of the directory can also be filtered.

Related Problems and Solutions