Spark DataFrame の勉強中です。
今回は, 以下の (1) のような stringify された JSON を Parseし query パラメータ部分を (key, value) に分割, 最終的に (2) の DataFrame を得るのが目的。
# (1) input dataframe
+-------------------------------------------------------------------------------------------------+
|value |
+-------------------------------------------------------------------------------------------------+
|{"query": "hoge=fuga&foo=bar×tamp=1478744124", "path": "/v1/item", "url": "api.example.com"}|
|{"query": "hoge=abc&foo=def×tamp=1478744452", "path": "/v1/item", "url": "api.example.com"} |
+-------------------------------------------------------------------------------------------------+
# (2) output dataframe
+---------------+--------+--------------------------------------+---+----+---+----------+
|url |path |query |id |hoge|foo|timestamp |
+---------------+--------+--------------------------------------+---+----+---+----------+
|api.example.com|/v1/item|hoge=fuga&foo=bar×tamp=1478744124|0 |fuga|bar|1478744124|
|api.example.com|/v1/item|hoge=abc&foo=def×tamp=1478744452 |1 |abc |def|1478744452|
+---------------+--------+--------------------------------------+---+----+---+----------+
環境は macOS 10.12.3, Spark 2.1.0
$ ./bin/spark-shell
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.1.0
/_/
Using Scala version 2.11.8 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_111)
Type in expressions to have them evaluated.
Type :help for more information.
JSON文字列を Map に変換
まず, JSON文字列を Map に変換し, DataFrame を作る。zipWithIndex で id を振っておく。
scala> val raw = Array("{\"query\": \"hoge=fuga&foo=bar×tamp=1478744124\", \"path\": \"/v1/item\", \"url\": \"api.example.com\"}", "{\"query\": \"hoge=abc&foo=def×tamp=1478744452\", \"path\":\"/v1/item\", \"url\": \"api.example.com\"}")
raw: Array[String] = Array({"query": "hoge=fuga&foo=bar×tamp=1478744124", "path": "/v1/item", "url": "api.example.com"}, {"query": "hoge=abc&foo=def×tamp=1478744452", "path": "/v1/item", "url": "api.example.com"})
scala> val inputRDD = sc.parallelize(raw).zipWithIndex
inputRDD: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at :26
scala> inputRDD.toDF("value", "id").show(false)
+-------------------------------------------------------------------------------------------------+---+
|value |id |
+-------------------------------------------------------------------------------------------------+---+
|{"query": "hoge=fuga&foo=bar×tamp=1478744124", "path": "/v1/item", "url": "api.example.com"}|0 |
|{"query": "hoge=abc&foo=def×tamp=1478744452", "path": "/v1/item", "url": "api.example.com"} |1 |
+-------------------------------------------------------------------------------------------------+---+
scala> import scala.util.parsing.json.JSON
import scala.util.parsing.json.JSON
scala> val mapData = inputRDD.map{ case (x,y) => (JSON.parseFull(x.toString).getOrElse().asInstanceOf[Map[String, Any]], y)}
warning: there was one deprecation warning; re-run with -deprecation for details
mapData: org.apache.spark.rdd.RDD[Map[String,Any]] = MapPartitionsRDD[8] at map at :29
scala> val baseDF = mapData.map{ case (x,y) => (
| x.get("url").getOrElse("").toString,
| x.get("path").getOrElse("").toString,
| x.get("query").getOrElse("").toString,
| y )}.
| toDF("url", "path", "query", "id")
baseDF: org.apache.spark.sql.DataFrame = [url: string, path: string ... 2 more fields]
scala> baseDF.show(false)
+---------------+--------+--------------------------------------+---+
|url |path |query |id |
+---------------+--------+--------------------------------------+---+
|api.example.com|/v1/item|hoge=fuga&foo=bar×tamp=1478744124|0 |
|api.example.com|/v1/item|hoge=abc&foo=def×tamp=1478744452 |1 |
+---------------+--------+--------------------------------------+---+
URLクエリパラメータの分解
query 列を “&” で分割, さらに “=” で分割し (key, value) を並べた RDD を作る。
この RDD を基に key で絞り込んだ DataFrame を作り id で baseDF と結合する。
scala> val queryRDD = baseDF.rdd.map( row => (row(2).toString.split("&").map(x => x.split("=")).map { case Array(x,y) => (x,y) } , row(3).toString))
queryRDD: org.apache.spark.rdd.RDD[(Array[(String, String)], String)] = ParallelCollectionRDD[5401] at parallelize at :36
scala> var outputDF = baseDF
outputDF: org.apache.spark.sql.DataFrame = [url: string, path: string ... 2 more fields]
scala> val keys = Array("hoge", "foo", "timestamp")
keys: Array[String] = Array(hoge, foo, timestamp)
scala> keys.foreach {
| case key => val filteredDF = queryRDD.map {case (x, i) => x.filter{ case (k, v) => k == key}.
| map{ case (a, b) => (b, i.toLong)}}.
| flatMap(s => s).toDF(key, "id")
| outputDF = outputDF.join(filteredDF, Seq("id"), "left_outer")
| }
scala> outputDF.show(false)
+---------------+--------+--------------------------------------+---+----+---+----------+
|url |path |query |id |hoge|foo|timestamp |
+---------------+--------+--------------------------------------+---+----+---+----------+
|api.example.com|/v1/item|hoge=fuga&foo=bar×tamp=1478744124|0 |fuga|bar|1478744124|
|api.example.com|/v1/item|hoge=abc&foo=def×tamp=1478744452 |1 |abc |def|1478744452|
+---------------+--------+--------------------------------------+---+----+---+----------+
DataFrame の collect メソッドは DataFrame の全ての要素を集約し配列として返すので大きなデータを扱う場合はメモリ不足の可能性があるので注意。
状況によっては rdd メソッドの方が効率が良い。
列の追加は User-Defined Functions を使った方が良さそうだし, 全体的にもっとスマートにできそう…
DataFrame の join メソッド に渡す joinType の種類が気になって org/apache/spark/sql/Dataset.scala を見てみると 7種類 あるのがわかった。
/**
* Equi-join with another `DataFrame` using the given columns. A cross join with a predicate
* is specified as an inner join. If you would explicitly like to perform a cross join use the
* `crossJoin` method.
*
* Different from other join functions, the join columns will only appear once in the output,
* i.e. similar to SQL's `JOIN USING` syntax.
*
* @param right Right side of the join operation.
* @param usingColumns Names of the columns to join on. This columns must exist on both sides.
* @param joinType Type of join to perform. Default `inner`. Must be one of:
* `inner`, `cross`, `outer`, `full`, `full_outer`, `left`, `left_outer`,
* `right`, `right_outer`, `left_semi`, `left_anti`.
*
* @note If you perform a self-join using this function without aliasing the input
* `DataFrame`s, you will NOT be able to reference any columns after the join, since
* there is no way to disambiguate which side of the join you would like to reference.
*
* @group untypedrel
* @since 2.0.0
*/
def join(right: Dataset[_], usingColumns: Seq[String], joinType: String): DataFrame = {
// Analyze the self join. The assumption is that the analyzer will disambiguate left vs right
// by creating a new instance for one of the branch.
val joined = sparkSession.sessionState.executePlan(
Join(logicalPlan, right.logicalPlan, joinType = JoinType(joinType), None))
.analyzed.asInstanceOf[Join]
withPlan {
Join(
joined.left,
joined.right,
UsingJoin(JoinType(joinType), usingColumns),
None)
}
}
まだまだ Scala と Spark RDD, DataFrame に慣れる必要がある。
ちなみに, データをファイルから読み込む sc.textFile() は ワイルドカード (*) が使えるのは非常に便利。
[1] Spark SQL, DataFrames and Datasets Guide
[2] spark.sql.crossJoin.enabled for Spark 2.x
[3] Reference column by id in Spark Dataframe
[4] How to convert nested array to a flat array?
[5] Scala: map a Map to list of tuples
[6] How to create an empty DataFrame with a specified schema?
[7] Spark DataframeのSample Code集
[8] UDFs — User-Defined Functions
[9] Renaming Column names of a Data frame in spark scala
[10] Better way to convert a string field into timestamp in Spark
[11] Write single CSV file using spark-csv