【Scala】Spark の DataFrame ハンドリング

Spark DataFrame の勉強中です。

今回は, 以下の (1) のような stringify された JSON を Parseし query パラメータ部分を (key, value) に分割, 最終的に (2) の DataFrame を得るのが目的。

# (1) input dataframe
+-------------------------------------------------------------------------------------------------+
|value                                                                                            |
+-------------------------------------------------------------------------------------------------+
|{"query": "hoge=fuga&foo=bar&timestamp=1478744124", "path": "/v1/item", "url": "api.example.com"}|
|{"query": "hoge=abc&foo=def&timestamp=1478744452", "path": "/v1/item", "url": "api.example.com"} |
+-------------------------------------------------------------------------------------------------+

# (2) output dataframe
+---------------+--------+--------------------------------------+---+----+---+----------+
|url            |path    |query                                 |id |hoge|foo|timestamp |
+---------------+--------+--------------------------------------+---+----+---+----------+
|api.example.com|/v1/item|hoge=fuga&foo=bar&timestamp=1478744124|0  |fuga|bar|1478744124|
|api.example.com|/v1/item|hoge=abc&foo=def&timestamp=1478744452 |1  |abc |def|1478744452|
+---------------+--------+--------------------------------------+---+----+---+----------+

環境は macOS 10.12.3, Spark 2.1.0

$ ./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 2.1.0
      /_/

Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.

JSON文字列を Map に変換

まず, JSON文字列を Map に変換し, DataFrame を作る。zipWithIndex で id を振っておく。

scala> val raw = Array("{\"query\": \"hoge=fuga&foo=bar&timestamp=1478744124\", \"path\": \"/v1/item\", \"url\": \"api.example.com\"}", "{\"query\": \"hoge=abc&foo=def&timestamp=1478744452\", \"path\":\"/v1/item\", \"url\": \"api.example.com\"}")
raw: Array[String] = Array({"query": "hoge=fuga&foo=bar&timestamp=1478744124", "path": "/v1/item", "url": "api.example.com"}, {"query": "hoge=abc&foo=def&timestamp=1478744452", "path": "/v1/item", "url": "api.example.com"})

scala> val inputRDD = sc.parallelize(raw).zipWithIndex
inputRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :26

scala> inputRDD.toDF("value", "id").show(false)
+-------------------------------------------------------------------------------------------------+---+
|value                                                                                            |id |
+-------------------------------------------------------------------------------------------------+---+
|{"query": "hoge=fuga&foo=bar&timestamp=1478744124", "path": "/v1/item", "url": "api.example.com"}|0  |
|{"query": "hoge=abc&foo=def&timestamp=1478744452", "path": "/v1/item", "url": "api.example.com"} |1  |
+-------------------------------------------------------------------------------------------------+---+

scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON

scala> val mapData = inputRDD.map{ case (x,y) => (JSON.parseFull(x.toString).getOrElse().asInstanceOf[Map[String, Any]], y)}
warning: there was one deprecation warning; re-run with -deprecation for details
mapData: org.apache.spark.rdd.RDD[Map[String,Any]] = MapPartitionsRDD[8] at map at :29

scala> val baseDF = mapData.map{ case (x,y) => (
     |     x.get("url").getOrElse("").toString,
     |     x.get("path").getOrElse("").toString,
     |     x.get("query").getOrElse("").toString,
     |     y )}.
     |   toDF("url", "path", "query", "id")
baseDF: org.apache.spark.sql.DataFrame = [url: string, path: string ... 2 more fields]

scala> baseDF.show(false)
+---------------+--------+--------------------------------------+---+
|url            |path    |query                                 |id |
+---------------+--------+--------------------------------------+---+
|api.example.com|/v1/item|hoge=fuga&foo=bar&timestamp=1478744124|0  |
|api.example.com|/v1/item|hoge=abc&foo=def&timestamp=1478744452 |1  |
+---------------+--------+--------------------------------------+---+

URLクエリパラメータの分解

query 列を “&” で分割, さらに “=” で分割し (key, value) を並べた RDD を作る。
この RDD を基に key で絞り込んだ DataFrame を作り id で baseDF と結合する。

scala> val queryRDD = baseDF.rdd.map( row => (row(2).toString.split("&").map(x => x.split("=")).map { case Array(x,y) => (x,y) } , row(3).toString))
queryRDD: org.apache.spark.rdd.RDD[(Array[(String, String)], String)] = ParallelCollectionRDD[5401] at parallelize at :36

scala> var outputDF = baseDF
outputDF: org.apache.spark.sql.DataFrame = [url: string, path: string ... 2 more fields]

scala> val keys = Array("hoge", "foo", "timestamp")
keys: Array[String] = Array(hoge, foo, timestamp)

scala> keys.foreach {
     |   case key => val filteredDF = queryRDD.map {case (x, i) => x.filter{ case (k, v) => k == key}.
     |     map{ case (a, b) => (b, i.toLong)}}.
     |     flatMap(s => s).toDF(key, "id")
     |   outputDF = outputDF.join(filteredDF, Seq("id"), "left_outer")
     | }

scala> outputDF.show(false)
+---------------+--------+--------------------------------------+---+----+---+----------+
|url            |path    |query                                 |id |hoge|foo|timestamp |
+---------------+--------+--------------------------------------+---+----+---+----------+
|api.example.com|/v1/item|hoge=fuga&foo=bar&timestamp=1478744124|0  |fuga|bar|1478744124|
|api.example.com|/v1/item|hoge=abc&foo=def&timestamp=1478744452 |1  |abc |def|1478744452|
+---------------+--------+--------------------------------------+---+----+---+----------+

DataFrame の collect メソッドは DataFrame の全ての要素を集約し配列として返すので大きなデータを扱う場合はメモリ不足の可能性があるので注意。
状況によっては rdd メソッドの方が効率が良い。

列の追加は User-Defined Functions を使った方が良さそうだし, 全体的にもっとスマートにできそう…

DataFrame の join メソッド に渡す joinType の種類が気になって org/apache/spark/sql/Dataset.scala を見てみると 7種類 あるのがわかった。

  /**
   * Equi-join with another `DataFrame` using the given columns. A cross join with a predicate
   * is specified as an inner join. If you would explicitly like to perform a cross join use the
   * `crossJoin` method.
   *
   * Different from other join functions, the join columns will only appear once in the output,
   * i.e. similar to SQL's `JOIN USING` syntax.
   *
   * @param right Right side of the join operation.
   * @param usingColumns Names of the columns to join on. This columns must exist on both sides.
   * @param joinType Type of join to perform. Default `inner`. Must be one of:
   *                 `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,
   *                 `right`, `right_outer`, `left_semi`, `left_anti`.
   *
   * @note If you perform a self-join using this function without aliasing the input
   * `DataFrame`s, you will NOT be able to reference any columns after the join, since
   * there is no way to disambiguate which side of the join you would like to reference.
   *
   * @group untypedrel
   * @since 2.0.0
   */
  def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
    // Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
    // by creating a new instance for one of the branch.
    val joined = sparkSession.sessionState.executePlan(
      Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
      .analyzed.asInstanceOf[Join]

    withPlan {
      Join(
        joined.left,
        joined.right,
        UsingJoin(JoinType(joinType), usingColumns),
        None)
    }
  }

まだまだ Scala と Spark RDD, DataFrame に慣れる必要がある。

ちなみに, データをファイルから読み込む sc.textFile() は ワイルドカード (*) が使えるのは非常に便利。


[1] Spark SQL, DataFrames and Datasets Guide
[2] spark.sql.crossJoin.enabled for Spark 2.x
[3] Reference column by id in Spark Dataframe
[4] How to convert nested array to a flat array?
[5] Scala: map a Map to list of tuples
[6] How to create an empty DataFrame with a specified schema?
[7] Spark DataframeのSample Code集
[8] UDFs — User-Defined Functions
[9] Renaming Column names of a Data frame in spark scala
[10] Better way to convert a string field into timestamp in Spark
[11] Write single CSV file using spark-csv