Python – PySpark(Python 2.7): How to flatten values after reduce

PySpark(Python 2.7): How to flatten values after reduce… here is a solution to the problem.

PySpark(Python 2.7): How to flatten values after reduce

I’m reading a multiline log file using SparkContext.newAPIHadoopFile with custom delimiter. I’m ready anyway, reducing my data. But now I want to add the key to every line (entry) again, then write it to an Apache Parquet file, and then store it in HDFS.

This diagram should explain my problem. What I’m looking for is the red arrow, such as the last conversion before writing to the file. Any ideas? I tried flatMap, but timestamps and floating-point values result in different records.

PySpark chain

Python scripts can be downloaded here and samples text file here.

Solution

Simple list comprehension should be more than enough:

from datetime import datetime

def flatten(kvs):
    """
    >>> kvs = ("852-YF-008", [
    ... (datetime(2016, 5, 10, 0, 0), 0.0),
    ... (datetime(2016, 5, 9, 23, 59), 0.0)])
    >>> flat = flatten(kvs)
    >>> len(flat)
    2
    >>> flat[0]
    ('852-YF-008', datetime.datetime(2016, 5, 10, 0, 0), 0.0)
    """
    k, vs = kvs
    return [(k, v1, v2) for v1, v2 in vs]

In Python 2.7, you can also use lambda expressions with tuple parameter unpacking, but this is not portable and is generally discouraged:

lambda (k, vs): [(k, v1, v2) for v1, v2 in vs]

Version independent:

lambda kvs: [(kvs[0], v1, v2) for v1, v2 in kvs[1]]

Edit:

If you only need to write partition data, you can convert directly to Parquet: without reduceByKey

(sheet
    .flatMap(process)
    .map(lambda x: (x[0], ) + x[1])
    .toDF(["key", "datettime", "value"])
    .write
    .partitionBy("key")
    .parquet(output_path))

Related Problems and Solutions