Sparkアプリケーションの開発言語で Scala を選択する利点に Unit Test が書きやすい点があると思います。Python は豊富な機械学習とその周辺ライブラリが使える一方, Spark を使うテストは少々工夫が必要な印象があります。
今回は ScalaTest + spark-testing-base で Sparkアプリケーションのテストを書いてみました。コマンドからのテスト実行と Eclipse でのテスト実行も試してみました。
環境は macOS 10.12.3, Scala 2.12, Eclipse 4.6 です。
spark-testing-base を使ってみた
spark-testing-base がサポートする主なモジュールは以下。
- SharedSparkContext: Provides SparkContext to be used in testing.
- RDDComparisons: Compares two RDDs.
- DataFrameSuiteBase: Checks for dataframe equality.
- DatasetSuiteBase: Checks for dataset equality.
- StreamingSuiteBase: Applies given operation on given input stream and compare with expected output.
- StreamingActionBase: Tests actions that don’t have output on input streams.
- RDDGenerator: Generates arbitrary RDDs.
- DataFrameGenerator: Generates arbitrary DataFrames.
- DatasetGenerator: Generates arbitrary Datasets.
例として DatasetSuiteBase を使って user_id ごとの最新のレコードを DataFrame (~=DataSet) で返す以下のメソッドをテストする。
package com.example
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
class Sample {
def getLatestByUID(df: DataFrame) = {
df.groupBy("user_id").
agg(max("timestamp")).
toDF("user_id", "timestamp") // rename column name
}
}
入力データが以下。
scala> inputDS.show
+----------+-------+
| timestamp|user_id|
+----------+-------+
|1495461520| 1|
|1495461521| 2|
|1495461522| 3|
|1495461523| 1|
|1495461524| 1|
|1495461525| 2|
+----------+-------+
結果が以下の DataFrame と一致するか assertDatasetEquals でテストする。
scala> expectedDS.show
+-------+----------+
|user_id| timestamp|
+-------+----------+
| 1|1495461524|
| 3|1495461522|
| 2|1495461525|
+-------+----------+
テストコードが以下。
package com.example.test
import org.scalatest._
import com.holdenkarau.spark.testing.DatasetSuiteBase
import com.example.Sample
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
class SampleSuite extends FunSuite with DatasetSuiteBase {
test("simple test") {
val spark = SparkSession.builder().getOrCreate()
val inputDS = spark.read.json("src/test/scala/resources/input.json")
val s = new Sample()
val actualDS = s.getLatestByUID(inputDS)
val expectedSchema = StructType(Array(
StructField("user_id", LongType, true),
StructField("timestamp", LongType, true))
)
val expectedDS = spark.read.format("json").
schema(expectedSchema).
load("src/test/scala/resources/expected.json")
assertDatasetEquals(actualDS, expectedDS)
}
}
テスト実行。
$ sbt test
[info] Set current project to scala-test-example (in build file:~/scala-test-example/)
[info] Compiling 1 Scala source to scala-test-example/target/scala-2.11/test-classes...
17/05/24 22:45:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
17/05/24 22:45:58 WARN SparkContext: Use an existing SparkContext, some configuration may not take effect.
17/05/24 22:46:17 WARN ObjectStore: Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
17/05/24 22:46:17 WARN ObjectStore: Failed to get database default, returning NoSuchObjectException
[info] SampleSuite:
[info] - simple test
[info] ScalaTest
[info] Run completed in 44 seconds, 248 milliseconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 1, Failed 0, Errors 0, Passed 1
[success] Total time: 61 s, completed 2017/05/24 22:46:31
sbteclipse
既存の sbt の project を Scala-IDE for Eclipse で使えるように Import したい。
sbteclipse を使うと Eclipse project files (.classpass, .project) を生成できる。
$ sbt
...
> eclipse
[info] About to create Eclipse project files for your project(s).
[info] Updating {file:scalatest-example...
[info] Resolving jline#jline;2.12 ...
[info] downloading https://repo1.maven.org/maven2/org/apache/avro/avro/1.7.7/avro-1.7.7.jar ...
[info] [SUCCESSFUL ] org.apache.avro#avro;1.7.7!avro.jar (1098ms)
[info] Done updating.
[info] Successfully created Eclipse project files for project(s):
[info] scalatest-example
`eclipse with-source=true` でライブラリのソースもインポートされた状態の project files にできる。
Eclipse を起動し File > import > General > Exsiting Projects into WorkSpace から project root を選択すると読み込まれる。
Eclipse project files を変更した場合は File > Refresh で Reload できる。
Eclipse で ScalaTest
Run As > Run Configurations > Scalatest で Type を package, Package name を com.example.test にして Run でテストを実行する。
ちなみに, Scala Library container, Scala Compiler container の Scala とプロジェクトの Scala の version が一致していない場合, 以下のエラーが発生した。
Caused by: java.lang.NoSuchMethodError: scala.Predef$.refArrayOps([Ljava/lang/Object;)Lscala/collection/mutable/ArrayOps;
Project > Properties > Scala Compiler をプロジェクトの Scala に合わせると上手くいった。
(Apply ボタンが反応しない時は .settings 以下を削除して再度設定したら上手くいった)
[1] spark-testing-base/wiki
[2] hive exception with Spark2 #148
[3] Scalaのユニットテスト入門
[4] sbt のインストール
[5] なぜあなたの sbt はすぐに起動しないのか