Apply window functions in Spark with non-constant frame sizes
My question
I’m currently having trouble with Spark window functions. I’m using Spark (via pyspark) version 1.6.3
(associated Python version 2.6.6).
I run an instance of the pyspark shell that automatically initializes HiveContext
to my sqlContext
.
I want to use the window
function for scroll summation. My problem is that the window frames are not fixed: it depends on the observations we consider. More specifically, I sort the data by a variable named rank_id
and want to roll sum any observations with an index of $x$ between index $x+1$ and $2x-1$. So my rangeBetween
has to depend on the rank_id
variable value.
The important point is that I don’t want to collect data and therefore can’t use something like numpy
(my data has a lot of observations).
Reproducible examples
from pyspark.mllib.random import RandomRDDs
import pyspark.sql.functions as psf
from pyspark.sql.window import Window
# Reproducible example
data = RandomRDDs.uniformVectorRDD(sc, 15, 2)
df = data.map(lambda l: (float(l[0]), float(l[1]))).toDF()
df = df.selectExpr("_1 as x", "_2 as y")
#df.show(2)
#+-------------------+------------------+
#| x| y|
#+-------------------+------------------+
#|0.32767742062486405|0.2506351566289311|
#| 0.7245348534550357| 0.597929853274274|
#+-------------------+------------------+
#only showing top 2 rows
# Finalize dataframe creation
w = Window().orderBy("x")
df = df.withColumn("rank_id", psf.rowNumber().over(w)).sort("rank_id")
#df.show(3)
#+--------------------+--------------------+-------+
#| x| y|rank_id|
#+--------------------+--------------------+-------+
#|0.016536160706045577|0.009892450530381458| 1|
#| 0.10943843181953838| 0.6478505849227775| 2|
#| 0.13916818312857027| 0.24165348228464578| 3|
#+--------------------+--------------------+-------+
#only showing top 3 rows
Fixed width cumulative sum: No problem
Using the window
function, I was able to run an accumulation sum on a given number of indexes (I’m using rangeBetween
here, but for this example rowBetween
is free to use).
w = Window.orderBy('rank_id').rangeBetween(-1,3)
df1 = df.select('*', psf.sum(df['y']).over(w).alias('roll1'))
#df1.show(3)
#+--------------------+--------------------+-------+------------------+
#| x| y|rank_id| roll1|
#+--------------------+--------------------+-------+------------------+
#|0.016536160706045577|0.009892450530381458| 1|0.9698521852602887|
#| 0.10943843181953838| 0.6478505849227775| 2|1.5744700156326066|
#| 0.13916818312857027| 0.24165348228464578| 3|2.3040547273760392|
#+--------------------+--------------------+-------+------------------+
#only showing top 3 rows
The cumulative and width are not fixed
I want to sum between indexes x+1 and 2x-1, where x is my row index. When I try to pass it to Spark (similar to the way we do it for orderBe,
maybe that’s the problem), I get the following error
# Now if I want to make rangeBetween size depend on a variable
w = Window.orderBy('rank_id').rangeBetween('rank_id'+1,2*'rank_id'-1)
Traceback (most recent call last):
File “”, line 1, in
TypeError: cannot concatenate ‘str’ and ‘int’ objects
I tried other methods, using SQL statements
# Using SQL expression
df.registerTempTable('tempdf')
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN rank_id+1 AND 2*rank_id-1) AS cumsum
FROM tempdf;
""")
This time give me the following error
Traceback (most recent call last):
File “”, line 6, in
File “/opt/application/Spark/current/python/pyspark/sql/context.py”, line >580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File “/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in call
File “/opt/application/Spark/current/python/pyspark/sql/utils.py”, line 51, in deco
raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u”cannot recognize input near ‘rank_id’ ‘+’ ‘1’ in windowframeboundary; line 3 pos 15″
I
also noticed that when I tried simpler statements using the SQL OVER clause, I got a similar error, which could mean that I wasn’t passing the SQL
statement correctly to Spark
df2 = sqlContext.sql("""
SELECT *, SUM(y)
OVER (ORDER BY rank_id
RANGE BETWEEN -1 AND 1) AS cumsum
FROM tempdf;
""")
Traceback (most recent call last):
File “”, line 6, in
File “/opt/application/Spark/current/python/pyspark/sql/context.py”, line 580, in sql
return DataFrame(self._ssql_ctx.sql(sqlQuery), self)
File “/opt/application/Spark/current/python/lib/py4j-0.9-src.zip/py4j/java_gateway.py”, line 813, in call
File “/opt/application/Spark/current/python/pyspark/sql/utils.py”, line 51, in deco
raise AnalysisException(s.split(‘: ‘, 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: u”cannot recognize input near ‘-‘ ‘1’ ‘AND’ in windowframeboundary; line 3 pos 15″
How can I use window
or SQL
statements in Spark to solve my problem?
Solution
How could I solve my problem by using either window or SQL statement within Spark?
TL;DR You cannot, or at least can’t meet current requirements in a scalable way. You can try something similar to sliding on an RDD: How to transform data with sliding window over time series data in Pyspark
I also noticed that when I try a more simple statement using SQL OVER clause, I got a similar error which maybe means I am not passing SQL statement correctly to Spark
This is not correct. The scope specification requires (PRECEDING
| FOLLOWING
| CURRENT_ROW
) Specification. There should also be no semicolon:
SELECT *, SUM(x)
OVER (ORDER BY rank_id
RANGE BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS cumsum
FROM tempdf
I want to sum between indexes x+1 and 2x-1 where x is my row index. When I try to pass it to Spark (in similar way we do for orderBy maybe that’s the problem), I got the following error …
TypeError: cannot concatenate ‘str’ and ‘int’ objects
As described in the exception case – you cannot call +
on strings and integers. You may want a column:
from pyspark.sql.functions import col
.rangeBetween(col('rank_id') + 1, 2* col('rank_id') - 1)
But this is not supported. The range must be a fixed size and cannot be defined based on an expression.
An important point is that I don’t want to collect data
Window definition without partitionBy
:
w = Window.orderBy('rank_id').rangeBetween(-1,3)
As bad as collecting. So even if there are workarounds to solve the problem of “dynamic frameworks” (with conditional and unbounded windows), they won’t help you here.