Python – Whether spark utilizes the sort order of the hbase key when using hbase as a data source

Whether spark utilizes the sort order of the hbase key when using hbase as a data source… here is a solution to the problem.

Whether spark utilizes the sort order of the hbase key when using hbase as a data source

I store time series data in HBase. rowkey consists of user_id and timestamp, like this:

{
    "userid1-1428364800" : {
        "columnFamily1" : {
            "val" : "1"
            }
        }
    }
    "userid1-1428364803" : {
        "columnFamily1" : {
            "val" : "2"
            }
        }
    }

"userid2-1428364812" : {
        "columnFamily1" : {
            "val" : "abc"
            }
        }
    }

}

Now I need to perform per-user analytics. This is the initialization of hbase_rdd (from here )

sc = SparkContext(appName="HBaseInputFormat")

conf = {"hbase.zookeeper.quorum": host, "hbase.mapreduce.inputtable": table}
keyConv = "org.apache.spark.examples.pythonconverters.ImmutableBytesWritableToStringConverter"
valueConv = "org.apache.spark.examples.pythonconverters.HBaseResultToStringConverter"

hbase_rdd = sc.newAPIHadoopRDD(
        "org.apache.hadoop.hbase.mapreduce.TableInputFormat",
        "org.apache.hadoop.hbase.io.ImmutableBytesWritable",
        "org.apache.hadoop.hbase.client.Result",
        keyConverter=keyConv,
        valueConverter=valueConv,
        conf=conf)

The natural mapreduce-like approach is:

hbase_rdd
   .map(lambda row: (row[0].split('-')[0], (row[0].split('-')[1], row[1])))  # shift timestamp from key to value
   .groupByKey()
   .map(processUserData)  # process user's data

When performing the first mapping (converting timestamps from key to value), it is critical to know when the current user’s time series data completes, so you can initiate the groupByKey transformation. So we don’t need to map all the tables and store all the temporary data. This is possible because hbase stores row keys in sorted order.

This can be done using Hadoop streaming:

import sys

current_user_data = []
last_userid = None
for line in sys.stdin:
    k, v = line.split('\t')
    userid, timestamp = k.split('-')
    if userid != last_userid and current_user_data:
        print processUserData(last_userid, current_user_data)
        last_userid = userid
        current_user_data = [(timestamp, v)]
    else:
        current_user_data.append((timestamp, v))

The question is: how do I take advantage of the sort order of the hbase key in Spark?

Solution

I’m not very familiar with the assurances you get from the way you pull data from HBase, but if I understand correctly, I can answer with plain old Spark.

You have some RDDs[X]. As far as Spark knows, X in RDD is completely unordered. But you have some outside knowledge, and you can guarantee that the data is actually grouped by one field of X (perhaps even sorted by another field).

In this case, you can use mapPartitions to do pretty much the same thing you would with hadoop streaming. This allows you to iterate through all records in a partition, so you can find records with the same key block.

val myRDD: RDD[X] = ...
val groupedData: RDD[Seq[X]] = myRdd.mapPartitions { itr =>
  var currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
  var currentUser: X = null
  itr is an iterator over *all* the records in one partition
  itr.flatMap { x => 
    if (currentUser != null && x.userId == currentUser.userId) {
       same user as before -- add the data to our list
      currentUserData += x
      None
    } else {
       its a new user -- return all the data for the old user, and make
       another buffer for the new user
      val userDataGrouped = currentUserData
      currentUserData = new scala.collection.mutable.ArrayBuffer[X]()
      currentUserData += x
      currentUser = x
      Some(userDataGrouped)
    }
  }
}
 now groupedRDD has all the data for one user grouped together, and we didn't
 need to do an expensive shuffle.  Also, the above transformation is lazy, so
 we don't necessarily even store all that data in memory -- we could still
 do more filtering on the fly, eg:
val usersWithLotsOfData = groupedRDD.filter{ userData => userData.size > 10 }

I

know you want to use python – sorry, I think I’m more likely to get the right example if I write in Scala. And I think the type annotation makes the meaning clearer, but that’s probably Scala’s bias… :). Anyway, hopefully you can understand what happened and translate it. (Don’t worry too much about flatMap & Some & None, if you understand the idea may not matter…)

.)

Related Problems and Solutions