BigDL は Apache Spark 上で動く分散 Deep Learning ライブラリ。Examples にテキスト分類や画像分類の例が含まれています。今回は簡単な回帰モデルを試してみました。
環境は macOS 10.12.3, Scala 2.11.8, BigDL 0.2.0です。
BigDLの入手
Downloads からビルド済の BigDL を DL する。
$ wget https://oss.sonatype.org/content/groups/public/com/intel/analytics/bigdl/dist-spark-2.0.0-scala-2.11.8-mac/0.2.0-SNAPSHOT/dist-spark-2.0.0-scala-2.11.8-mac-0.2.0-20170609.194429-36-dist.zip
$ unzip dist-spark-2.0.0-scala-2.11.8-mac-0.2.0-20170609.194429-36-dist.zip
BigDLで回帰モデル
今回使うデータセットは airquality で, ニューヨークの大気状態観測値で6つの変数から成るデータ。
- Ozone: オゾン (ppb)
- Solar.R: 日射量 (lang)
- Wind: 風力 (mph)
- Temp: 温度 (華氏 F)
- Month: 5~9月
- Day: 1~31日
回帰モデルを書くにあたり, 下記のクラスを中心にソースを読んでみた。
- com.intel.analytics.bigdl.tensor.Tensor:
Tensor計算には Intel MKL や BLAS が使われている。TensorNumeric に Tensor の基本演算子が定義。 - com.intel.analytics.bigdl.nn.abstractnn.TensorModule:
Module は NN の基本コンポーネント。AbstractModule の抽象サブクラス。Module にReLU や LogSoftMax などの活性化関数を重ねていく。 - com.intel.analytics.bigdl.dataset.Sample:
BigDL独自の features と label の組みを表すクラス。 - com.intel.analytics.bigdl.optim.Optimizer:
SGD や Adagrad などの最適化手法やバッチサイズを設定。trainSummary, ValidationSummary を設定すると tensorboard に入力する summary を出力できる。詳しくは Visualization with TensorBoardを参照。
モデルが以下。
package com.intel.analytics.bigdl.example
import com.intel.analytics.bigdl._
import com.intel.analytics.bigdl.nn._
object SimpleRegression {
import com.intel.analytics.bigdl.numeric.NumericFloat
def apply(): Module[Float] = {
val model = Sequential()
model.add(Reshape(Array(4))) // feature size
.add(Linear(4, 8).setName("func_1"))
.add(ReLU(true))
.add(Linear(8, 1).setName("func_2"))
.add(Mean())
model
}
}
全体のコードは以下。
package com.intel.analytics.bigdl.example
import org.apache.spark.sql._
import org.apache.spark.rdd.RDD
import org.apache.spark.SparkContext
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.types.{StructType, StructField, DoubleType}
import com.intel.analytics.bigdl.nn._
import com.intel.analytics.bigdl.tensor._
import com.intel.analytics.bigdl.optim._
// import com.intel.analytics.bigdl.numeric.NumericFloat // 気をつける
import com.intel.analytics.bigdl.utils.Engine
import com.intel.analytics.bigdl.dataset._
object RegressionExample {
def loadData(path: String, schema: StructType): DataFrame = {
val spark = SparkSession.builder.appName("").getOrCreate()
val df = spark.read.option("header", true).schema(schema).csv(path)
df
}
def transform(df: DataFrame, feats: Array[String]): DataFrame = {
val assembler = new VectorAssembler().setInputCols(feats).setOutputCol("features")
val transformedDF = assembler.transform(df)
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
val scalerModel = scaler.fit(transformedDF)
val scaledDF = scalerModel.transform(transformedDF)
scaledDF
}
def convertToSampleRDD(df: DataFrame): RDD[Sample[Float]] = {
df.rdd.map {case row: org.apache.spark.sql.Row =>
val vec = row(6).asInstanceOf[org.apache.spark.ml.linalg.DenseVector]
val feats = Array(vec(0).toFloat , vec(1).toFloat, vec(2).toFloat, vec(3).toFloat)
val label = row(0).toString.toFloat
Sample(
featureTensor = Tensor(feats, Array(1, 4)).contiguous(),
labelTensor = Tensor(Array(label), Array(1))
)
}
}
def main(args: Array[String]): Unit = {
val conf = Engine.createSparkConf()
.setAppName("SimpleRegression")
.set("spark.task.maxFailures", "1")
val sc = new SparkContext(conf)
Engine.init
val schema = StructType(
StructField("Ozone", DoubleType, true) ::
StructField("Solar", DoubleType, true) ::
StructField("Wind", DoubleType, true) ::
StructField("Temp", DoubleType, true) ::
StructField("Month", DoubleType, true) ::
StructField("Day", DoubleType, true) :: Nil
)
val df = loadData("data/airquality.csv", schema)
val scaledDF = transform(df, Array("Solar", "Wind", "Temp", "Month"))
scaledDF.show
val sampleRDD = convertToSampleRDD(scaledDF)
val trainingSplit = 0.7
val Array(trainingRDD, validationRDD) = sampleRDD.randomSplit(Array(trainingSplit, 1 - trainingSplit))
println(trainingRDD.count)
println(validationRDD.count)
val batchSize = 2
val optimizer = Optimizer(
model = SimpleRegression(),
sampleRDD = trainingRDD,
criterion = new MSECriterion[Float](),
batchSize = batchSize
)
val optimMethod = new RMSprop[Float](learningRate = 0.01)
val result = optimizer
.setOptimMethod(optimMethod)
.setValidation(
Trigger.everyEpoch,
validationRDD,
Array(), // new Loss[Float]
batchSize)
.setEndWhen(Trigger.maxEpoch(30))
.optimize()
validationRDD.foreach(s => println(s.label))
result.predict(validationRDD).collect.foreach(println)
trainingRDD.foreach(s => println(s.label))
result.predict(trainingRDD).collect.foreach(println)
println(result.parameters)
println(validationRDD.count())
result.predict(trainingRDD).collect().foreach(r => println(r.toTensor[Float]))
sc.stop()
}
}
BigDLで高いパフォーマンスが出るのは環境変数が正しく設定されている場合で spark-submit 時に conf/spark-bigdl.conf を指定。
$ source bin/bigdl.sh
$ bin/bigdl.sh -- $SPARK_HOME/bin/spark-submit \
--executor-cores 2 --num-executors 2 --driver-memory 4g --executor-memory 4g --executor-memory 4g \
--properties-file conf/spark-bigdl.conf \
--class com.intel.analytics.bigdl.example.RegressionExample \
--jars lib/bigdl-0.2.0-SNAPSHOT-jar-with-dependencies.jar \
bigdl_regression_example_2.11-0.1.0.jar
コードは GitHub にも置いた。
[1] Extracting, transforming and selecting features
[2] bigdl-user-group