August 20, 2014

Using Parquet + Protobufs with Spark

I recently had occasion to test out using Parquet with protobufs. I got some simple tests working, and since I had to do a lot of reading to get to this point, I thought I’d do the world a favor and document the process here.

First, some definitions:

Parquet is a column-oriented data storage format for Hadoop from Twitter. Column-oriented storage is really nice for “wide” data, since you can efficiently read just the fields you need.

Protobuf is a data serialization library developed by google. It lets you efficiently and quickly serialize and deserialize data for transport.

Parquet has low-level support for protobufs, which means that if you happen to have protobuf-serialized data, you can use it with parquet as-is to performantly do partial deserialzations and query across that data.

You might do that using [spark][spark], a fast mapreduce engine with some nice ease-of-use. Spark can even read from Hadoop, which is nice.

I got a lot of information from this post on doing the same with Avro. I happen to be using Clojure, but I hope you’ll be able to follow along anyhow (here’s a quick syntax primer). If you want to follow along exactly, you can check out the github repo of my sample project.

The first tricky bit was sorting dependencies out. Some highlights from this process:

  • You must exclude the import of javax.servlet:servlet-api from hadoop, and from anything that depends on hadoop. Otherwise, you’ll get some issues where this conflicts with spark’s version.
  • You need to explicitly include a hadoop-client of your preferred version, otherwise Spark will fall back on some undefined client version (Hadoop 1.something)
  • You need to import a number of separate parquet projects.

Here’s what my project.clj (like maven but shorter) ended up looking like:

(defproject sparkquet "0.1.0-SNAPSHOT"
  :description "FIXME: write description"
  :url "http://example.com/FIXME"
  :license {:name "Eclipse Public License"
            :url "http://www.eclipse.org/legal/epl-v10.html"}
  :dependencies [[org.clojure/clojure "1.6.0"]

                 ; Spark wrapper
                 [yieldbot/flambo "0.3.2"]

                 ; Still need spark & hadoop (must pick specific client)
                 [org.apache.spark/spark-core_2.10 "1.0.1"]
                 [org.apache.hadoop/hadoop-client "2.4.1"
                  :exclusions [javax.servlet/servlet-api]] ; Conflicts with spark's

                 ; Parquet stuff
                 [com.twitter/parquet-common "1.6.0rc1"]
                 [com.twitter/parquet-encoding "1.6.0rc1"]
                 [com.twitter/parquet-column "1.6.0rc1"]
                 [com.twitter/parquet-hadoop "1.6.0rc1"
                  :exclusions [javax.servlet/servlet-api] ]
                 [com.twitter/parquet-protobuf "1.6.0rc1"
                  :exclusions [javax.servlet/servlet-api commons-lang]]

                 ; And, of course, protobufs
                 [com.google.protobuf/protobuf-java "2.5.0"]
                 ]
  :java-source-paths ["src/java"]
  :source-paths ["src/clj"]
  :plugins [[lein-protobuf "0.4.1"]]
  )

For this example, we’ll be using this simple protobuf:

package sparkquet;

message MyDocument {
    enum Category {
        THINGS = 1;
        STUFF = 2;
        CRAP = 3;
    }
    required string id = 1;
    required string name = 2;
    required string description = 3;
    required Category category = 4;
    required uint64 created = 5;
}

You’ll need to compile that to a class somehow (I used lein-protobuf).

I’ll let the code to most of the talking here. I put some helpful comments in for your benefit:

(ns sparkquet.core
  (:require [flambo.conf :as conf]
            [flambo.api :as f])
  (:import
    [parquet.hadoop ParquetOutputFormat ParquetInputFormat]
    [parquet.proto ProtoParquetOutputFormat ProtoParquetInputFormat ProtoWriteSupport ProtoReadSupport]
    org.apache.hadoop.mapreduce.Job
    sparkquet.Document$MyDocument ; Import our protobuf
    sparkquet.Document$MyDocument$Category ; and our enum
    sparkquet.OnlyStuff
    ))



(defn make-protobuf
  "Helper function to make a protobuf from a hashmap. You could
  also use something like clojure-protobuf:
  https://github.com/ninjudd/clojure-protobuf"
  [data]
  (let [builder (Document$MyDocument/newBuilder)]
    (doto builder
      (.setId (:id data))
      (.setName (:name data))
      (.setDescription (:description data))
      (.setCategory (:category data))
      (.setCreated (:created data)))
    (.build builder)))
 
(defn produce-my-protobufs
  "This function serves as a generic source of protobufs. You can replace
  this with whatever you like. Perhaps you have a .csv file that you can
  open with f/text-file and map to a protobuf? Whatever you like."
  [sc]
  (f/parallelize
    sc
    (map make-protobuf [
      {:id "1" :name "Thing 1" :description "This is a thing"
       :category Document$MyDocument$Category/THINGS :created (System/currentTimeMillis)}
      {:id "2" :name "Thing 2" :description "This is a thing"
       :category Document$MyDocument$Category/THINGS :created (System/currentTimeMillis)}
      {:id "3" :name "Crap 1" :description "This is some crap"
       :category Document$MyDocument$Category/CRAP :created (System/currentTimeMillis)}
      {:id "4" :name "Stuff 1" :description "This is stuff"
       :category Document$MyDocument$Category/STUFF :created (System/currentTimeMillis)}
      {:id "5" :name "Stuff 2" :description "This is stuff"
       :category Document$MyDocument$Category/STUFF :created (System/currentTimeMillis)}
      {:id "6" :name "Stuff 3" :description "This is stuff"
       :category Document$MyDocument$Category/STUFF :created (System/currentTimeMillis)}
      {:id "7" :name "Stuff 4" :description "This is stuff"
       :category Document$MyDocument$Category/STUFF :created (System/currentTimeMillis)}])))


(defn write-protobufs!
  "Use Spark's .saveAsNewAPIHadoopFile to write a your protobufs."
  [rdd job outfilepath]
  (-> rdd
      (f/map-to-pair (f/fn [buf] [nil buf])) ; We need to have a PairRDD
      (.saveAsNewAPIHadoopFile
        outfilepath                 ; Can (probably should) be an hdfs:// url
        Void                        ; We don't have a key class, just some protobufs
        Document$MyDocument         ; Would be a static import + .class in java
        ParquetOutputFormat         ; Use the ParquetOutputFormat
        (.getConfiguration job))))  ; Protobuf things are present on the job config.))

(defn read-protobufs
  "Use Spark's .newAPIHadoopFile to load your protobufs"
  [sc job infilepath]
  (->
    (.newAPIHadoopFile sc
      infilepath            ; Or hdfs:// url
      ParquetInputFormat
      Void                  ; Void key (.newAPIHadoopFile always returns (k,v) pair rdds)
      Document$MyDocument   ; Protobuf class for value
      (. job getConfiguration))

    (f/map (f/fn [tup] (._2 tup))))) ; Strip void keys from our pair data.

(defn get-job
  "Important initializers for Parquet Protobuf support. Updates a job's configuration"
  []
  (let [job (Job.)]

    ; You need to set the read support and write support classes
    (ParquetOutputFormat/setWriteSupportClass job ProtoWriteSupport)
    (ParquetInputFormat/setReadSupportClass job ProtoReadSupport)

    ; You also need to tell the writer your protobuf class (reader doesn't need it)
    (ProtoParquetOutputFormat/setProtobufClass job Document$MyDocument)

    job))

(defn -main []
  (let [conf (-> (conf/spark-conf)
                 (conf/master "local[4]") ; Run locally with 4 workers
                 (conf/app-name "protobuftest"))
        sc (f/spark-context conf) ; Create a spark context
        job (get-job) ; Create a Hadoop job to hold configuration 
        path "hdfs://localhost:9000/user/protobuftest2"
        ]

    ; First, we can write our protobufs
    (-> sc
        (produce-my-protobufs) ; Get your Protobuf RDD
        (write-protobufs! job path))

    ; Now, we can read them back
    (-> sc
        (read-protobufs job path)
        (f/collect)
        (first)
        (.getId)
        )

    ; You can also add a Parquet-level filter on your job to massively improve performance
    ; when running queries that can be easily pared down.
    (ParquetInputFormat/setUnboundRecordFilter job OnlyStuff)

    (-> sc
        (read-protobufs job path)
        (f/collect)) ; There should only be the 4 items now.

    ; If you like, you can set a *projection* on your job. This will read a
    ; subset of your fields for efficiency. Here's what you might do if you
    ; just needed names filtered by category:
    (ProtoParquetInputFormat/setRequestedProjection
      job "message MyDocument { required binary name; required binary category; }")

    (-> sc
        (read-protobufs job path)
        (f/map (f/fn [buf] (.getName buf)))
        (f/collect)) ; Remember, the record filter is still applied.

    ))


; Defs for REPL usage
(comment 
  (def conf (-> (conf/spark-conf)
                (conf/master "local[4]") ; Run locally with 4 workers
                (conf/app-name "protobuftest")))
  (def sc (f/spark-context conf))   ; Create a spark context
  (def job (get-job)) ; Create a Hadoop job to hold configuration 
  (def path "hdfs://localhost:9000/user/protobuftest4" )
  )

Lots of stuff going on here, but some of the trickier bits:

  • The saveAsNewAPIHadoopFile and newAPIHadoopFile methods exist on and return, respectively, only Pair RDDs. If you have un-keyed data, as we do, you’ll need to pack/unpack your data into tuples before/after saving/loading if you want to pretend like you just have a stream of protobufs. Just use Void as the key class when you call the relevant method.

  • You need to use a hadoop Job object to store and pass around configuration.

  • You need to set the support classes for your input and output formats. You’ll also need to set the protobuf class using setProtobufClass on your ProtoParquetOutputFormat. You don’t need to do this on input.

Filters

You can use setUnboundRecordFilter on ParquetInputFormat to do really efficient filtering on your data as you read it. Since Parquet is aware of the protobuf file’s layout, it can check only the fields it needs for the filter, and only deserialize the rest of the protobuf if the filter passes. This is very fast.

To create a filter, you implement the UnboundRecordFilter interface, which has one method, bind. You can use this method to bind the filter you create with the readers passed to the bind method.

I used this one java helper, which implements a filter. This could also be done in clojure with a gen-class, but lein works well enough on java sources that we may as well do it this way.

package sparkquet;

import parquet.column.ColumnReader;
import parquet.filter.RecordFilter;
import parquet.filter.ColumnRecordFilter;
import parquet.filter.UnboundRecordFilter;
import parquet.filter.ColumnPredicates;

import static sparkquet.Document.MyDocument;

public class OnlyStuff implements UnboundRecordFilter {
    public RecordFilter bind(Iterable<ColumnReader> readers){
        return ColumnRecordFilter.column(
            "category",
             ColumnPredicates.equalTo(MyDocument.Category.STUFF)
        ).bind(readers);
    }
}

Projections

Parquet’s protobuf support will let you define a projection, which is a way of telling it what fields to read (generally a subset of the fields that exist). Since Parquet is a column store, this means it can efficiently read just this data and leave the rest.

Defining a projection is an unfortunately poorly-documented procedure. To define a projection, you pass a string to ProtoParquetInputFormat/setRequestedProjection. The string should be a set of field definitions in an apparently-undocumented format that resembles protobuf’s. Twitter’s Parquet announcement blog post has some examples, but unfortunately the examples are for some different version of Parquet, since Parquet no longer supports a string type (use binary instead).

For our example, we use the following to extract name (a string) and category (an enum):

message MyDocument {
  required binary name;
  required binary category;
}

Performance

I didn’t (and won’t) do formal benchmarks, so I can only give my rememberances for working on about 6GB of wide data.

  • Running a mapreduce job after reading the data from CSV took about 90 seconds
  • Running the same job on protobufs from Parquet took about 130 seconds. The extra 40 seconds was probably deserialization overhead.
  • Adding projection mapping to trim the 45-odd fields down to the 4 I needed dropped the job to about 60 seconds
  • Moving the “primary” filter from a Spark filter task to a Parquet filter reduced the time to just 20 seconds.

So, in this case, Parquet turned out to be a win.

That’s it for this post. I hope it helped you figure this thing out.

[columnoriented]: [spark]: http://spark.apache.org/