Python – Pyspark sc.textFile() does not fully load the file

Pyspark sc.textFile() does not fully load the file… here is a solution to the problem.

Pyspark sc.textFile() does not fully load the file

I started with Python Spark (v 1.6.0) on Cloudera quickstart docker containers.
I successfully placed a static .txt file (500 MB) in hdfs under /user/root/access_log.txt.

In Pyspark, I tried loading a file with the following python line of code:

lines = sc.textFile("hdfs://quickstart.cloudera/user/root/access_log.txt")

This didn’t give me any error. But I found that the file did not load completely.
Also…

lines.max()

While HDFS actually has the correct file size, it does not give the correct last element of the file.

Is this a memory issue? My docker setup is set to 3840 MB.
I don’t know how to fix this. I look forward to your answer.

Edit:

I calculate the elements in the dataset using :

lines.count()

To my surprise, it was correct! This should mean that my files are loaded correctly. But the question remains, why the .max() statement does not return the correct element.

Is this related to different tasks?

Edit 2:
.txt some example lines in the file

10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmpics/0000/2229/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 184976
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000163.jpg HTTP/1.1" 200 60117
10.190.174.142 - - [03/Dec/2011:13:28:10 -0800] "GET /images/filmmediablock/360/Chacha.jpg HTTP/1.1" 200 109379
10.190.174.142 - - [03/Dec/2011:13:28:11 -0800] "GET /images/filmmediablock/360/GOEMON-NUKI-000159.jpg HTTP/1.1" 200 161657

Solution

In general, max should not return (…) the last element. In some cases, if the format used by the log file enforces lexicographical order, and you are lucky with the content, it will not happen otherwise. Since your data is prefixed with IP addresses and uses an unfriendly (e.g. ISO 8601) timestamp format, getting the last element is not something you can expect.

One way to find the last element is to include index:

from operator import itemgetter

(rdd
    .zipWithIndex()                # Add line number to get (line, no)
    .max(key=itemgetter(1))[0])    # Compare elements using index

A somewhat different approach is to find the last element for each partition and then find the last one from those elements.

from functools import reduce

rdd.mapPartitions(lambda part: reduce(lambda _, x: [x], part, [])).collect()[-1]

Or if the number of partitions is large:

(rdd
    .mapPartitionsWithIndex(
        lambda i, part: reduce(lambda _, x: [(i, x)], part, []))
    .max()[1])  # Take max using tuple ordering

Related Problems and Solutions