Python – Gets the default HDFS path where Parquet files are saved

Gets the default HDFS path where Parquet files are saved… here is a solution to the problem.

Gets the default HDFS path where Parquet files are saved

I ran a spark job and ended up saving a Parquet file, and the job completed successfully. But I only specified the file name, not the path to HDFS. Is there a way to print out the default HDFS path where spark writes files? I looked at sc._conf.getAll() but there doesn’t seem to be anything useful there.

Solution

AFAIK This is one of those ways (except for the simple command method of hadoop fs -ls -R | grep -i yourfile)….

Here’s a sample scala code snippet…. (If you want to execute it in Python or Java, you can emulate the same API call)
Get a list of Parquet files. And filter them like below ….

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs. {FileStatus, FileSystem, Path}
import org.apache.hadoop.io. {BytesWritable, Text}
import org.apache.spark. {SparkConf, SparkContext}
other imports here 
lazy val sparkConf = new SparkConf()    
 lazy val sc = SparkContext.getOrCreate(sparkConf)   
 lazy val fileSystem = FileSystem.get(sc.hadoopConfiguration)
    val fileSystem = listChaildStatuses(fileSystem , new Path("yourbasepathofHDFS")) // normally hdfs://server/user like this...
  val allparquet = fileSystem.filter(_.getPath.getName.endsWith(".parquet"))
 now you can print these parquet files out of which your files will be present and you can know the base path...

The following support methods are available

/**
        * Get [[org.apache.hadoop.fs.FileStatus]] objects for all Chaild children (files) under the given base path. If the
        * given path points to a file, return a single-element collection containing [[org.apache.hadoop.fs.FileStatus]] of
        * that file.
        */
     def listChaildStatuses(fs: FileSystem, basePath: Path): Seq[FileStatus] = {
        listChaildStatuses(fs, fs.getFileStatus(basePath))
    }

/**
    * Get [[FileStatus]] objects for all Chaild children (files) under the given base path. If the
    * given path points to a file, return a single-element collection containing [[FileStatus]] of
    * that file.
    */
  def listChaildStatuses(fs: FileSystem, baseStatus: FileStatus): Seq[FileStatus] = {
    def recurse(status: FileStatus): Seq[FileStatus] = {
      val (directories, leaves) = fs.listStatus(status.getPath).partition(_.isDirectory)
      leaves ++ directories.flatMap(f => listChaildStatuses(fs, f))
    }

if (baseStatus.isDirectory) recurse(baseStatus) else Seq(baseStatus)
  }

Related Problems and Solutions