【Spark / Scala】BigDL で回帰モデル

BigDL は Apache Spark 上で動く分散 Deep Learning ライブラリ。Examples にテキスト分類や画像分類の例が含まれています。今回は簡単な回帰モデルを試してみました。

環境は macOS 10.12.3, Scala 2.11.8, BigDL 0.2.0です。

BigDLの入手

Downloads からビルド済の BigDL を DL する。

$ wget https://oss.sonatype.org/content/groups/public/com/intel/analytics/bigdl/dist-spark-2.0.0-scala-2.11.8-mac/0.2.0-SNAPSHOT/dist-spark-2.0.0-scala-2.11.8-mac-0.2.0-20170609.194429-36-dist.zip
$ unzip dist-spark-2.0.0-scala-2.11.8-mac-0.2.0-20170609.194429-36-dist.zip

BigDLで回帰モデル

今回使うデータセットは airquality で, ニューヨークの大気状態観測値で6つの変数から成るデータ。

  • Ozone: オゾン (ppb)
  • Solar.R: 日射量 (lang)
  • Wind: 風力 (mph)
  • Temp: 温度 (華氏 F)
  • Month: 5~9月
  • Day: 1~31日

回帰モデルを書くにあたり, 下記のクラスを中心にソースを読んでみた。

  • com.intel.analytics.bigdl.tensor.Tensor:
    Tensor計算には Intel MKL や BLAS が使われている。TensorNumeric に Tensor の基本演算子が定義。
  • com.intel.analytics.bigdl.nn.abstractnn.TensorModule:
    Module は NN の基本コンポーネント。AbstractModule の抽象サブクラス。Module にReLU や LogSoftMax などの活性化関数を重ねていく。
  • com.intel.analytics.bigdl.dataset.Sample:
    BigDL独自の features と label の組みを表すクラス。
  • com.intel.analytics.bigdl.optim.Optimizer:
    SGD や Adagrad などの最適化手法やバッチサイズを設定。trainSummary, ValidationSummary を設定すると tensorboard に入力する summary を出力できる。詳しくは Visualization with TensorBoardを参照。

モデルが以下。

package com.intel.analytics.bigdl.example

import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.nn._

object SimpleRegression {
  import com.intel.analytics.bigdl.numeric.NumericFloat
  def apply(): Module[Float] = {
    val model = Sequential()
    model.add(Reshape(Array(4))) // feature size
    .add(Linear(4, 8).setName("func_1"))
    .add(ReLU(true))
    .add(Linear(8, 1).setName("func_2"))
    .add(Mean())
    model
  }
}

全体のコードは以下。

package com.intel.analytics.bigdl.example

import org.apache.spark.sql._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{StructType, StructField, DoubleType}

import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.tensor._
import com.intel.analytics.bigdl.optim._
// import com.intel.analytics.bigdl.numeric.NumericFloat // 気をつける
import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._

object RegressionExample {

  def loadData(path: String, schema: StructType): DataFrame = {
    val spark = SparkSession.builder.appName("").getOrCreate()
    val df = spark.read.option("header", true).schema(schema).csv(path)
    df
  }

  def transform(df: DataFrame, feats: Array[String]): DataFrame = {
      val assembler = new VectorAssembler().setInputCols(feats).setOutputCol("features")
      val transformedDF = assembler.transform(df)
      val scaler = new StandardScaler()
        .setInputCol("features")
        .setOutputCol("scaledFeatures")
        .setWithStd(true)
        .setWithMean(false)
      val scalerModel = scaler.fit(transformedDF)
      val scaledDF = scalerModel.transform(transformedDF)
      scaledDF
  }

  def convertToSampleRDD(df: DataFrame): RDD[Sample[Float]] = {
    df.rdd.map {case row: org.apache.spark.sql.Row =>
      val vec  = row(6).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
      val feats = Array(vec(0).toFloat , vec(1).toFloat, vec(2).toFloat, vec(3).toFloat)
      val label = row(0).toString.toFloat
      Sample(
          featureTensor = Tensor(feats, Array(1, 4)).contiguous(),
          labelTensor = Tensor(Array(label), Array(1))
       )
    }
  }

  def main(args: Array[String]): Unit = {
      val conf = Engine.createSparkConf()
        .setAppName("SimpleRegression")
        .set("spark.task.maxFailures", "1")
      val sc = new SparkContext(conf)

      Engine.init

      val schema = StructType(
          StructField("Ozone", DoubleType, true) ::
          StructField("Solar", DoubleType, true) ::
          StructField("Wind", DoubleType, true) ::
          StructField("Temp", DoubleType, true) ::
          StructField("Month", DoubleType, true) ::
          StructField("Day", DoubleType, true) :: Nil
      )

      val df = loadData("data/airquality.csv", schema)
      val scaledDF = transform(df, Array("Solar", "Wind", "Temp", "Month"))
      scaledDF.show

      val sampleRDD = convertToSampleRDD(scaledDF)
      val trainingSplit = 0.7
      val Array(trainingRDD, validationRDD) = sampleRDD.randomSplit(Array(trainingSplit, 1 - trainingSplit))
      println(trainingRDD.count)
      println(validationRDD.count)

      val batchSize = 2
      val optimizer = Optimizer(
          model = SimpleRegression(),
          sampleRDD = trainingRDD,
          criterion = new MSECriterion[Float](),
          batchSize = batchSize
      )

      val optimMethod = new RMSprop[Float](learningRate = 0.01)

      val result = optimizer
        .setOptimMethod(optimMethod)
        .setValidation(
            Trigger.everyEpoch,
            validationRDD,
            Array(), // new Loss[Float]
            batchSize)
        .setEndWhen(Trigger.maxEpoch(30))
        .optimize()

      validationRDD.foreach(s => println(s.label))
      result.predict(validationRDD).collect.foreach(println)
      trainingRDD.foreach(s => println(s.label))
      result.predict(trainingRDD).collect.foreach(println)
      println(result.parameters)
      println(validationRDD.count())

      result.predict(trainingRDD).collect().foreach(r => println(r.toTensor[Float]))

      sc.stop()
  }
}

BigDLで高いパフォーマンスが出るのは環境変数が正しく設定されている場合で spark-submit 時に conf/spark-bigdl.conf を指定。

$ source bin/bigdl.sh
$ bin/bigdl.sh -- $SPARK_HOME/bin/spark-submit \
  --executor-cores 2 --num-executors 2 --driver-memory 4g --executor-memory 4g  --executor-memory 4g \
  --properties-file conf/spark-bigdl.conf \
  --class com.intel.analytics.bigdl.example.RegressionExample \
  --jars lib/bigdl-0.2.0-SNAPSHOT-jar-with-dependencies.jar \
  bigdl_regression_example_2.11-0.1.0.jar

コードは GitHub にも置いた。


[1] Extracting, transforming and selecting features
[2] bigdl-user-group