“Learning Spark”読書会#1 に参加しました。
今回は Apache Spark インストールから MLlib の Statistics, LinearRegressionWithSGD を使ってみるまでのメモです。
Apache Spark インストール
環境は OSX 10.10.2 です。
$ curl -O https://www.apache.org/dyn/closer.cgi/spark/spark-1.2.1/spark-1.2.1-bin-hadoop2.4.tgz
$ tar xzf spark-1.2.1-bin-hadoop2.4.tgz
$ ln -s ~/path/to/your/spark-1.2.1-bin-hadoop2.4 /usr/local/share/spark
$ PATH=/usr/local/share/spark/bin:$PATH; export PATH
pysparkを実行します。 2> /dev/null で標準エラーは破棄することができますがログレベルを適切に設定するのが良いと思います。
$ pyspark
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 1.2.1
/_/
Using Python version 2.7.6 (default, Sep 9 2014 15:04:36)
SparkContext available as sc.
試しに README.md の行数をカウントしてみます。この結果は $ wc README.md と同じになります。
>>> lines = sc.textFile("/usr/local/share/spark/README.md")
>>> print lines.count()
98
また, https://localhost:4040/ にアクセスすると管理画面が表示されます。
Ipython が入っていなければインストールしておくと捗る。
$ sudo pip install "ipython[notebook]"
$ ipython notebook
MLlib
MLlibの Statistics, LinearRegressionWithSGD を使ってみます。Programming Guide (1.2)のコード, ほとんどそのままです。
まずは, 先程の README.md の行数カウントから。
from pyspark import SparkContext
sc = SparkContext('local', 'Simple App')
lines = sc.textFile('/usr/local/share/spark/README.md')
print(lines.count())
# => 98
以下のようにpysparkの後にファイル名を指定して実行可能です。
$ pyspark intro.py
Statistics
要約統計量の計算。
from pyspark import SparkContext
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.stat import Statistics
import numpy as np
sc = SparkContext('local', 'Statistics')
# an RDD of Vectors
rdd = sc.parallelize([Vectors.dense([2, 0, 0, -2]),
Vectors.dense([4, 5, 0, 3]),
Vectors.dense([6, 7, 0, 8])])
# Compute column summary statistics.
summary = Statistics.colStats(rdd)
print(summary.mean())
# => [ 4. 4. 0. 3.]
print(summary.variance())
# => [ 4. 13. 0. 25.]
print(summary.numNonzeros())
# => [ 3. 2. 0. 3.]
LinearRegressionWithSGD
線形回帰。平均二乗誤差を求めます。
from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD
from numpy import array
sc = SparkContext('local', 'LinearRegression')
# Load and parse the data
def parsePoint(line):
values = [float(x) for x in line.replace(',', ' ').split(' ')]
return LabeledPoint(values[0], values[1:])
data = sc.textFile("data/mllib/ridge-data/lpsa.data")
parsedData = data.map(parsePoint)
# Build the model
model = LinearRegressionWithSGD.train(parsedData)
# Evaluate the model on training data
valuesAndPreds = parsedData.map(lambda p: (p.label, model.predict(p.features)))
MSE = valuesAndPreds.map(lambda (v, p): (v - p)**2).reduce(lambda x, y: x + y) / valuesAndPreds.count()
print("Mean Squared Error = " + str(MSE))
# => Mean Squared Error = 6.20680779331