Python – PySpark XML to JSON with time series data

PySpark XML to JSON with time series data… here is a solution to the problem.

PySpark XML to JSON with time series data

I have almost 500,000 XML files containing time series data, each about 2-3MB, each containing about 10k rows of time series data. The idea is to convert an XML file of each unique ID to JSON. However, the time series data for each ID needs to be broken down into batches with a row size of 10 and converted to JSON and written to a NoSQL database. Initially, the code was written to iterate over an overall dataframe for each ID, incremented by 10 for the row size, and then write the document to the database.

def resample_idx(X,resample_rate):
    for idx in range(0,len(X),resample_rate):
        yield X.iloc[idx:idx+resample_rate,:]

# Batch Documents 
    for idx, df_batch in enumerate(resample_idx(df,10))
        dict_ = {}
        dict_['id'] = soup.find('id').contents[0]
        dict_['data'] = [v for k,v in pd. DataFrame.to_dict(df_batch. T).items()]

An example of a JSON document is as follows:

{'id':123456A,
'data': [{'A': 251.23,
          'B': 130.56,
          'dtim': Timestamp('2011-03-24 11:18:13.350000')
         },
         {
          'A': 253.23,
          'B': 140.56,
          'dtim': Timestamp('2011-03-24 11:19:21.310000')
         },
         .........
        ]
},
{'id':123593X,
'data': [{'A': 641.13,
          'B': 220.51,
          'C': 10.45
          'dtim': Timestamp('2011-03-26 12:11:13.350000')
         },
         {
          'A': 153.25,
          'B': 810.16,
          'C': 12.5
          'dtim': Timestamp('2011-03-26 12:11:13.310000')
         },
         .........
        ]
}

This worked well for small samples, but quickly realized that this didn’t scale when creating batches. Therefore, you want to replicate it in Spark. Experience with Spark is limited, but here’s what I’ve tried so far:

Get all time series data for all IDs first:

df = sqlContext.read.format("com.databricks.spark.xml").options(rowTag='log').load("dbfs:/mnt/timedata/")

XML Schema

 |-- _id: string (nullable = true)   
 |-- collect_list(TimeData): array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- data: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- ColNames: string (nullable = true)
 |    |    |-- Units: string (nullable = true)

Get a SQL query for a Spark DataFrame
d = df.select(“_id”,”TimeData.data”,’TimeData.ColNames’)

The current Spark data frame

+--------------------+--------------------+--------------------+
|                id  |                data|            ColNames|
+--------------------+--------------------+--------------------+
|123456A             | [2011-03-24 11:18...| dTim,A,B            |
|123456A             | [2011-03-24 11:19...| dTim,A,B            |
|123593X             | [2011-03-26 12:11...| dTim,A,B,C          |
|123593X             | [2011-03-26 12:11...| dTim,A,B,C          |
+--------------------+--------------------+--------------------+

The expected Spark dataframe

+--------------------+--------------------+----------+----------+
|                id  |               dTime|         A|         B|
+--------------------+--------------------+----------+----------+
|123456A             |2011-03-24 11:18... |    251.23|    130.56|
|123456A             |2011-03-24 11:19... |    253.23|    140.56|
+--------------------+--------------------+----------+----------+

+--------------------+--------------------+----------+----------+----------+
|                id  |               dTime|         A|         B|         C|
+--------------------+--------------------+----------+----------+----------+
|123593X             |2011-03-26 12:11... |    641.13|    220.51|     10.45|
|123593X             |2011-03-26 12:11... |    153.25|    810.16|      12.5|
+--------------------+-------------------+---------- +----------+----------+

I’m only showing data with two timestamps here, but how can I convert the DataFrame above to a batch JSON file for every nth row (for each id), similar to how done with Pandas shown above? The original idea was to do groupBy and apply UDFs to each ID? The output is similar to the JSON structure above.

XML Structure:

<log>
   <id>"ABC"</id>
   <TimeData>
      <colNames>dTim,colA,colB,colC,</colNames>
      <data>2011-03-24T11:18:13.350Z,0.139,38.988,0,110.307</data>
      <data>2011-03-24T11:18:43.897Z,0.138,39.017,0,110.307</data>
  </TimeData>
</log>

Note that there is no fixed number of coNames per ID, which can range from 5-30, depending on the data source collected for that ID.

Solution

Well, according to the information, this could be a solution. Unfortunately, my Python is a bit rusty, but there should be equivalents for all Scala functions here

// Assume nth is based of dTim ordering
val windowSpec = Window
  .partitionBy($"_id")
  .orderBy($"dTim".desc)

val nthRow  = 2  // define the nthItem to be fetched

df.select(
  $"_id",
  $"TimeData.data".getItem(0).getItem(0).cast(TimestampType).alias("dTim"),
  $"TimeData.data".getItem(0).getItem(1).cast(DoubleType).alias("A"),
  $"TimeData.data".getItem(0).getItem(2).cast(DoubleType).alias("B"),
  $"TimeData.data".getItem(0).getItem(3).cast(DoubleType).alias("C")
).withColumn("n", row_number().over(windowSpec))
  .filter(col("n") === nthRow)
  .drop("n")
.show()

Will output something like

that

+-------+--------------------+------+------+-----+
|    _id|                dTim|     A|     B|    C|
+-------+--------------------+------+------+-----+
|123456A|2011-03-24 11:18:...| 251.23|130.56| null|
|123593X|2011-03-26 12:11:...| 641.13|220.51|10.45|
+-------+--------------------+------+------+-----+

If I knew more, I would improve the answer


Update

I

like the puzzle, so if I understand the problem correctly, this could be a solution :

I created 3 xml files with a total of 2 different IDs for each 2 data records

val df = spark
  .sqlContext
  .read
  .format("com.databricks.spark.xml")
  .option("rowTag", "log")
  .load("src/main/resources/xml")

 Could be computationally heavy, maybe cache df first if possible, otherwise run it on a sample, otherwise hardcode possible colums
val colNames = df
  .select(explode(split($"TimeData.colNames",",")).as("col"))
  .distinct()
  .filter($"col" =!= lit("dTim") && $"col" =!= "")
  .collect()
  .map(_.getString(0))
  .toList
  .sorted

 or list all possible columns
val colNames = List("colA", "colB", "colC")

 Based on XML colNames and data are comma seprated strings that have to be split. Could be done using sql split function, but this UDF maps the columns to the correct field
def mapColsToData = udf((cols:String, data:Seq[String]) =>
  if(cols == null || data == null) Seq.empty[Map[String, String]]
  else {
    data.map(str => (cols.split(",") zip str.split(",")).toMap)
  }
)

  The result of this action is 1 record for each datapoint for all XML's. Each data record is key->value map of colName->data
val denorm = df.select($"id", explode(mapColsToData($"TimeData.colNames", $"TimeData.data")).as("data"))

denorm.show(false)

Output:

+-------+-------------------------------------------------------------------------------+
|id     |data                                                                           |
+-------+-------------------------------------------------------------------------------+
|123456A| Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC -> 0)|
|123456A| Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0)|
|123593X| Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)           |
|123593X| Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017)           |
|123456A| Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)|
|123456A| Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0)|
+-------+-------------------------------------------------------------------------------+
// now create column for each map value, based on predef / found columnNames
val columized = denorm.select(
  $"id",
  $"data.dTim".cast(TimestampType).alias("dTim"),
  $"data"
)

columized.show()

Output:

+-------+--------------------+--------------------+
|     id|                dTim|                data|
+-------+--------------------+--------------------+
|123456A|2011-03-24 12:18:...| Map(dTim -> 2011-...|
|123456A|2011-03-24 12:18:...| Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...| Map(dTim -> 2011-...|
|123593X|2011-03-26 12:20:...| Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...| Map(dTim -> 2011-...|
|123456A|2011-03-27 13:18:...| Map(dTim -> 2011-...|
+-------+--------------------+--------------------+
// create window over which to resample
val windowSpec = Window
  .partitionBy($"id")
  .orderBy($"dTim".desc)

val resampleRate = 2

 add batchId based on resample rate. Group by batch and
val batched = columized
  .withColumn("batchId", floor((row_number().over(windowSpec) - lit(1)) / lit(resampleRate)))
  .groupBy($"id", $"batchId")
  .agg(collect_list($"data").as("data"))
  .drop("batchId")

batched.show(false)

Output:

+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|id     |data                                                                                                                                                              |
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123593X| [Map(dTim -> 2011-03-26T11:20:43.897Z, colA -> 1.138, colB -> 29.017), Map(dTim -> 2011-03-26T11:20:13.350Z, colA -> 1.139, colB -> 28.988)]                      |
|123456A| [Map(dTim -> 2011-03-27T11:18:43.897Z, colA -> 0.128, colB -> 35.017, colC -> 0), Map(dTim -> 2011-03-27T11:18:13.350Z, colA -> 0.129, colB -> 35.988, colC -> 0)]|
|123456A| [Map(dTim -> 2011-03-24T11:18:43.897Z, colA -> 0.138, colB -> 39.017, colC -> 0), Map(dTim -> 2011-03-24T11:18:13.350Z, colA -> 0.139, colB -> 38.988, colC ->  0)]|
+-------+------------------------------------------------------------------------------------------------------------------------------------------------------------------+
// Store as 1 huge json file (drop reapatrition if you can handle multiple json, better for master as well)
batched.repartition(1).write.mode(SaveMode.Overwrite).json("/tmp/xml")

Output json:

{"id":"123593X","data":[{"dTim":"2011-03-26T12:20:43.897+01:00","colA":"1.138","colB":"29.017"},{"dTim":" 2011-03-26T12:20:13.350+01:00","colA":"1.139","colB":"28.988"}]}
{"id":"123456A","data":[{"dTim":"2011-03-27T13:18:43.897+02:00","colA":"0.128","colB":"35.017","colC":"0"},{"dTim":"2011-03-27T13:18:13.350+02:00","colA":"0.129"," colB":"35.988","colC":"0"}]}
{"id":"123456A","data":[{"dTim":"2011-03-24T12:18:43.897+01:00","colA":"0.138","colB":"39.017","colC":"0"},{"dTim":"2011-03-24T12:18:13.350+01:00","colA":"0.139"," colB":"38.988","colC":"0"}]}

Related Problems and Solutions