Python – PySpark divides columns by their sums

PySpark divides columns by their sums… here is a solution to the problem.

PySpark divides columns by their sums

I’m trying to divide the columns in PySpark by their respective sums. My data frame (using only one column here) looks like this:

event_rates = [[1,10.461016949152542], [2, 10.38953488372093], [3, 10.609418282548477]]
event_rates = spark.createDataFrame(event_rates, ['cluster_id','mean_encoded'])
event_rates.show()

+----------+------------------+
|cluster_id|      mean_encoded|
+----------+------------------+
|         1|10.461016949152542|
|         2| 10.38953488372093|
|         3|10.609418282548477|
+----------+------------------+

I tried both methods to do this, but neither got results

from pyspark.sql.functions import sum as spark_sum
cols = event_rates.columns[1:]
for each in cols:
    event_rates = event_rates.withColumn(each+"_scaled", event_rates[each]/spark_sum(event_rates[each]))

This gives me the following error

org.apache.spark.sql.AnalysisException: grouping expressions sequence is empty, and '`cluster_id`' is not an aggregate function. Wrap '((`mean_encoded` / sum(`mean_encoded`)) AS `mean_encoded_scaled`)' in windowing function(s) or wrap '`cluster_id`' in first() (or first_value) if you don't care which value you get.;;
Aggregate [cluster_id#22356L, mean_encoded#22357, (mean_encoded#22357 / sum(mean_encoded#22357)) AS mean_encoded_scaled#2

and follow the question Here I tried the following

stats = (event_rates.agg([spark_sum(x).alias(x + '_sum') for x in cols]))
event_rates = event_rates.join(broadcast(stats))
exprs = [event_rates[x] / event_rates[event_rates + '_sum'] for x in cols]
event_rates.select(exprs)

But I get an error from the first line

AssertionError: all exprs should be Column

How do I fix this?

Related Problems and Solutions