Python – Wide dataframe operations in Pyspark are too slow

Wide dataframe operations in Pyspark are too slow… here is a solution to the problem.

Wide dataframe operations in Pyspark are too slow

I’m new to Spark and am trying to use pyspark (Spark 2.2) to perform filtering and aggregation operations on a very broad feature set (about 13 million rows, 15,000 columns). Feature sets are stored as parquet files on S3 drives. I’m running a test script to load a feature set in a data frame, select a few thousand records, group by a specific region code, and average each of the 15k feature columns. The problem is that the job goes wrong or takes too long (about 8 hours for 5% of the recorded sample).

Is there any way to speed up these types of operations on Pyspark’s wide dataframe? I’m using Jupyter notebooks and want these queries to complete in minutes instead of hours.

Here is my code

df_feature_store = spark.read.parquet(PATH_FEATURE_STORE).sample(False, 0.05, seed=0).cache()
    logger.info("Initial data set loaded and sampled")

df_selected_rors = spark.read.csv(PATH_DATA_SOURCE+"ROR Sample.csv", header=True)
    agg_cols = [x for x in df_feature_store.columns if re.search("^G\d{2}",x)]
    agg_cols = agg_cols[:10]  # just testing with fewer columns
    expr = {x:"mean" for x in agg_cols}
    joineddf = df_feature_store.join(df_selected_rors, df_feature_store. ROLLOUTREGION_IDENTIFIER == df_selected_rors. ROR, "inner")
    aggdf = joineddf.groupby("ROLLOUT_REGION_IDENTIFIER").agg(expr)
    # replace groupby
    # loop for a 1000 column aggregations 
    # transpose columns into rows as arrays
    aggdf.write.mode("overwrite").csv(PATH_FEATURE_STORE + "aggregated", header=True)
    logger.info("Done")`

Solution

I’ll try splitting it to see what the problem is

  • Some versions of Spark have issues with many columns in DF; I don’t remember the specifics.
  • Before any query, read from CSV and saved in local Parquet, filter columns if you can
  • Run the query Parquet local- to Parquet local

S3 as a work destination (a) is slow to commit and (b) is at risk of losing data due to eventual consistency of S3. Unless you use S3mper/S3Guard/EMR-consistent EMRFS, you should not use it as a direct target for your work.

Related Problems and Solutions