【Spark / Eclipse】ScalaTest + spark-testing-base

Sparkアプリケーションの開発言語で Scala を選択する利点に Unit Test が書きやすい点があると思います。Python は豊富な機械学習とその周辺ライブラリが使える一方, Spark を使うテストは少々工夫が必要な印象があります。

今回は ScalaTest + spark-testing-base で Sparkアプリケーションのテストを書いてみました。コマンドからのテスト実行と Eclipse でのテスト実行も試してみました。

環境は macOS 10.12.3, Scala 2.12, Eclipse 4.6 です。

spark-testing-base を使ってみた

spark-testing-base がサポートする主なモジュールは以下。

  • SharedSparkContext: Provides SparkContext to be used in testing.
  • RDDComparisons: Compares two RDDs.
  • DataFrameSuiteBase: Checks for dataframe equality.
  • DatasetSuiteBase: Checks for dataset equality.
  • StreamingSuiteBase: Applies given operation on given input stream and compare with expected output.
  • StreamingActionBase: Tests actions that don’t have output on input streams.
  • RDDGenerator: Generates arbitrary RDDs.
  • DataFrameGenerator: Generates arbitrary DataFrames.
  • DatasetGenerator: Generates arbitrary Datasets.

例として DatasetSuiteBase を使って user_id ごとの最新のレコードを DataFrame (~=DataSet) で返す以下のメソッドをテストする。

package com.example

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

class Sample {
  def getLatestByUID(df: DataFrame) = {
    df.groupBy("user_id").
      agg(max("timestamp")).
      toDF("user_id", "timestamp") // rename column name
  }
}

入力データが以下。

scala> inputDS.show
+----------+-------+
| timestamp|user_id|
+----------+-------+
|1495461520|      1|
|1495461521|      2|
|1495461522|      3|
|1495461523|      1|
|1495461524|      1|
|1495461525|      2|
+----------+-------+

結果が以下の DataFrame と一致するか assertDatasetEquals でテストする。

scala> expectedDS.show
+-------+----------+
|user_id| timestamp|
+-------+----------+
|      1|1495461524|
|      3|1495461522|
|      2|1495461525|
+-------+----------+

テストコードが以下。

package com.example.test

import org.scalatest._
import com.holdenkarau.spark.testing.DatasetSuiteBase

import com.example.Sample
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._

class SampleSuite extends FunSuite with DatasetSuiteBase {

  test("simple test") {
    val spark = SparkSession.builder().getOrCreate()

    val inputDS = spark.read.json("src/test/scala/resources/input.json")

    val s = new Sample()
    val actualDS = s.getLatestByUID(inputDS)

    val expectedSchema = StructType(Array(
      StructField("user_id", LongType, true),
      StructField("timestamp", LongType, true))
    )

    val expectedDS = spark.read.format("json").
      schema(expectedSchema).
      load("src/test/scala/resources/expected.json")

    assertDatasetEquals(actualDS, expectedDS)
  }
}

テスト実行。

$ sbt test
[info] Set current project to scala-test-example (in build file:~/scala-test-example/)
[info] Compiling 1 Scala source to scala-test-example/target/scala-2.11/test-classes...
17/05/24 22:45:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/24 22:45:58 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
17/05/24 22:46:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/24 22:46:17 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
[info] SampleSuite:
[info] - simple test
[info] ScalaTest
[info] Run completed in 44 seconds, 248 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 61 s, completed 2017/05/24 22:46:31

sbteclipse

既存の sbt の project を Scala-IDE for Eclipse で使えるように Import したい。
sbteclipse を使うと Eclipse project files (.classpass, .project) を生成できる。

$ sbt
...
> eclipse
[info] About to create Eclipse project files for your project(s).
[info] Updating {file:scalatest-example...
[info] Resolving jline#jline;2.12 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/avro/avro/1.7.7/avro-1.7.7.jar ...
[info]  [SUCCESSFUL ] org.apache.avro#avro;1.7.7!avro.jar (1098ms)
[info] Done updating.
[info] Successfully created Eclipse project files for project(s):
[info] scalatest-example

`eclipse with-source=true` でライブラリのソースもインポートされた状態の project files にできる。

Eclipse を起動し File > import > General > Exsiting Projects into WorkSpace から project root を選択すると読み込まれる。
Eclipse project files を変更した場合は File > Refresh で Reload できる。

Eclipse で ScalaTest

Run As > Run Configurations > Scalatest で Type を package, Package name を com.example.test にして Run でテストを実行する。

ちなみに, Scala Library container, Scala Compiler container の Scala とプロジェクトの Scala の version が一致していない場合, 以下のエラーが発生した。

Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;

Project > Properties > Scala Compiler をプロジェクトの Scala に合わせると上手くいった。
(Apply ボタンが反応しない時は .settings 以下を削除して再度設定したら上手くいった)


[1] spark-testing-base/wiki
[2] hive exception with Spark2 #148
[3] Scalaのユニットテスト入門
[4] sbt のインストール
[5] なぜあなたの sbt はすぐに起動しないのか