【MLlib / PySpark】OSX で Apache Spark を使ってみる

“Learning Spark”読書会#1 に参加しました。

今回は Apache Spark インストールから MLlib の Statistics, LinearRegressionWithSGD を使ってみるまでのメモです。

Apache Spark インストール

環境は OSX 10.10.2 です。

$ curl -O https://www.apache.org/dyn/closer.cgi/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz
$ tar xzf spark-1.2.1-bin-hadoop2.4.tgz
$ ln -s ~/path/to/your/spark-1.2.1-bin-hadoop2.4 /usr/local/share/spark
$ PATH=/usr/local/share/spark/bin:$PATH; export PATH

pysparkを実行します。 2> /dev/null で標準エラーは破棄することができますがログレベルを適切に設定するのが良いと思います。

$ pyspark
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 1.2.1
      /_/

Using Python version 2.7.6 (default, Sep  9 2014 15:04:36)
SparkContext available as sc.

試しに README.md の行数をカウントしてみます。この結果は $ wc README.md と同じになります。

>>> lines = sc.textFile("/usr/local/share/spark/README.md")
>>> print lines.count()
98

また, https://localhost:4040/ にアクセスすると管理画面が表示されます。

Ipython が入っていなければインストールしておくと捗る。

$ sudo pip install "ipython[notebook]"
$ ipython notebook 

MLlib

MLlibの Statistics, LinearRegressionWithSGD を使ってみます。Programming Guide (1.2)のコード, ほとんどそのままです。

まずは, 先程の README.md の行数カウントから。

from pyspark import SparkContext

sc = SparkContext('local', 'Simple App')

lines = sc.textFile('/usr/local/share/spark/README.md')

print(lines.count())
# => 98

以下のようにpysparkの後にファイル名を指定して実行可能です。

$ pyspark intro.py

Statistics

要約統計量の計算。

from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import numpy as np

sc = SparkContext('local', 'Statistics')

# an RDD of Vectors
rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
		Vectors.dense([4, 5, 0,  3]),
		Vectors.dense([6, 7, 0,  8])])

# Compute column summary statistics.
summary = Statistics.colStats(rdd)

print(summary.mean())
# => [ 4.  4.  0.  3.]
print(summary.variance())
# => [  4.  13.   0.  25.]
print(summary.numNonzeros())
# => [ 3.  2.  0.  3.]

LinearRegressionWithSGD

線形回帰。平均二乗誤差を求めます。

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array

sc = SparkContext('local', 'LinearRegression')

# Load and parse the data
def parsePoint(line):
    values = [float(x) for x in line.replace(',', ' ').split(' ')]
    return LabeledPoint(values[0], values[1:])

data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)

# Build the model
model = LinearRegressionWithSGD.train(parsedData)

# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()

print("Mean Squared Error = " + str(MSE))
# => Mean Squared Error = 6.20680779331


[1] RDD-Paper
[2] Resilient Distributed Datasets(RDD)