【Apache Spark】NMF (ALS) による推薦を試してみた

前回, 構築した Spark on YARN 環境で ml/recommendation/ALS を試してみます。 実行環境は以下です。

  • macOS: 10.12.1
  • Java: 1.8.0_111
  • Apache Hadoop: 2.7.3
  • Apache Spark: 2.0.2

HDFS にデータをロード

今回使う data/mllib/als/sample_movielens_ratings.txt は左から, ユーザID, アイテムID, レイティング, 時間 となっている。 レイティングに 0 の多い疎な行列となっている。

$ head $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt
0::2::3::1424380312
0::3::1::1424380312
0::5::2::1424380312
0::9::4::1424380312
0::11::1::1424380312
0::12::2::1424380312
0::15::1::1424380312
0::17::1::1424380312
0::19::1::1424380312
0::21::1::1424380312

Local から HDFS に データを load する。

$ hdfs version
Hadoop 2.7.3
$ hdfs dfs -mkdir als
$ hdfs dfs -put $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt als/
$ hdfs dfs -ls -R
drwxr-xr-x   - user supergroup          0 2016-11-25 02:13 .sparkStaging
drwxr-xr-x   - user supergroup          0 2016-11-27 22:59 als
-rw-r--r--   1 user supergroup      32363 2016-11-27 22:59 als/sample_movielens_ratings.txt

推薦の個人化

推薦システムのアルゴリズム [1] の中で, 推薦の個人化の度合い [Ben Schafer 01] を以下の3段階に分けて紹介している。

  • 非個人化レコメンド (non personalization)
  • 顧客中心レコメンド (ephemeral personalization)
  • 商品中心レコメンド (persistent personalization)

協調フィルタリングは個人化されたレコメンドで, 内容ベースフィルタリングと比較して多様性・セレンディピティに優れておりドメイン知識を必要としない。

SVD

SVD (Singular Value Decomposition) は解析的な行列分解手法である。
SVD は観測されたユーザ-アイテム評価行列 R をユーザを表す行列 U, アイテムを表す行列 V (共に正規直交行列) , 対角行列 Σ の3つの行列の積に分解する。

     \begin{eqnarray*} R = U \Sigma V^{\mathrm{T}} \end{eqnarray*}

潜在次元 k (k ≤ n かつ k ≤ m) として, 低ランク近似された評価行列 R_hat (n x m 行列) は以下となる。

     \begin{eqnarray*} \hat{R} = U_k \Sigma_k V^{\mathrm{T}}_k \end{eqnarray*}

上記の SVD ベースの潜在因子モデルを, 簡単な評価行列と scipy.sparse.linalg.svds を用いて確認してみる。評価行列 R の全体平均 μ を R の全ての要素から差し引いてから SVD を実行する。

$ ipython
Python 3.7.0 (default, Jun 28 2018, 07:39:16)
Type 'copyright', 'credits' or 'license' for more information
IPython 6.5.0 -- An enhanced Interactive Python. Type '?' for help.

In [1]: import numpy as np

In [2]: from scipy.sparse.linalg import svds

In [3]: R = np.array([[5,4,0,1,2,1],
   ...:               [4,0,3,1,1,2],
   ...:               [0,5,5,0,3,3],
   ...:               [2,0,1,4,5,4],
   ...:               [2,2,2,0,4,0],
   ...:               [1,2,1,0,5,4]], dtype="float64")
   ...:

In [4]: R2 = np.copy(R)

In [5]: R2[R2 == 0] = np.nan

In [6]: R2
Out[6]:
array([[ 5.,  4., nan,  1.,  2.,  1.],
       [ 4., nan,  3.,  1.,  1.,  2.],
       [nan,  5.,  5., nan,  3.,  3.],
       [ 2., nan,  1.,  4.,  5.,  4.],
       [ 2.,  2.,  2., nan,  4., nan],
       [ 1.,  2.,  1., nan,  5.,  4.]])

In [7]: mean = np.nanmean(R2)

In [8]: R_demeaned = R - mean

In [9]: R_demeaned
Out[9]:
array([[ 2.17857143,  1.17857143, -2.82142857, -1.82142857, -0.82142857,
        -1.82142857],
       [ 1.17857143, -2.82142857,  0.17857143, -1.82142857, -1.82142857,
        -0.82142857],
       [-2.82142857,  2.17857143,  2.17857143, -2.82142857,  0.17857143,
         0.17857143],
       [-0.82142857, -2.82142857, -1.82142857,  1.17857143,  2.17857143,
         1.17857143],
       [-0.82142857, -0.82142857, -0.82142857, -2.82142857,  1.17857143,
        -2.82142857],
       [-1.82142857, -0.82142857, -1.82142857, -2.82142857,  2.17857143,
         1.17857143]])

In [10]: U, sigma, V = svds(R_demeaned, k=3)

In [11]: R_hat = np.dot(np.dot(U, np.diag(sigma)), V) + mean

In [12]: R_hat
Out[12]:
array([[ 4.77106591e+00,  2.57776643e+00,  1.31319706e+00,
         8.34199981e-01,  1.58564715e+00,  4.72567009e-01],
       [ 4.13417160e+00,  2.05639306e+00,  1.41861100e+00,
         1.82196596e+00,  2.34454315e+00,  1.51066755e+00],
       [ 7.05093927e-02,  5.29760510e+00,  4.84796781e+00,
         1.49609042e-01,  3.32749381e+00,  2.82639856e+00],
       [ 2.14703692e+00, -2.45868652e-03,  1.02485509e+00,
         3.86081065e+00,  4.97458026e+00,  4.23747455e+00],
       [ 2.33357665e+00,  2.35709159e+00,  1.42260196e+00,
        -5.49594423e-01,  3.46325936e+00,  1.35795894e+00],
       [ 6.63530155e-01,  1.50666756e+00,  1.54101323e+00,
         3.27265928e-01,  5.15537602e+00,  3.06587771e+00]])

NMF

NMF (NonNegative Matrix Factorization) は非負行列を低次元の非負行列の積に分解する手法。一般にユーザによるアイテムの評価は非負の値を取るため, NMF の最適化問題では U と V が非負となる制約を与えて最適化を行う。R は疎であるが UV^T は密な行列となる。これは潜在的な特徴 (好みやジャンルに対応) を含む。この推定で問題となるのは, ランクが低いため R = UV^T の解が一般には存在しないことである。さらに行列分解で最善な U と最善な V を同時には直接求められない。そこで, 交互最適化という最適化手法を用いて R ≈ UV^T となるように因子分解し U, V を得る。得られた低次元の U, V から類似ユーザや類似アイテムを推薦したり, UV^T から R を近似的に復元しその情報を使う方法が考えられる。

scikit-learn にはsklearn.decomposition.NMF が実装されている。

ALS

NMF で U, V を得るため ALS (Alternating Least Squares) という交互最適化手法を用いる方法がある。
まず V をランダムに選択された行ベクトルで初期化し, この V と R から U を推定する。逆行列の計算では解けないが, QR分解のような方法で解いていく。 また, U の各行は並列計算可能なので大規模分散処理に向いている。V の計算は推定された U を用いて同様に行い, この手順を交互に行う。 初期値にランダムな値を使うが繰り返すうちに妥当な値に収束していく。

ml/recommendation/ALS を試してみる

コードは examples/src/main/python/ml/als_example.py とほぼ同様で, 今回は HDFS から読み込むので read.text() に hdfs://… を指定している。

HDFS ではなく Local から読み込む場合 は $SPARK_HOME/data/mllib/als/sample_movielens_ratings.txt を指定する。

読み込んだ RDD から DataFrame を生成して, 訓練:評価 = 8:2 に分割している。モデルを構築する ALS に指定するパラメータは [3] を参考に。

#coding:utf-8

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from __future__ import print_function

import sys
if sys.version >= '3':
    long = int

from pyspark.sql import SparkSession

# $example on$
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row
# $example off$

if __name__ == "__main__":
    spark = SparkSession\
        .builder\
        .appName("FirstApp")\
        .getOrCreate()

    # $example on$
    lines = spark.read.text("hdfs://localhost:9000/user/you/als/sample_movielens_ratings.txt").rdd
    print("lines = " + str(lines.count()))
    
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=long(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2])

    # Build the recommendation model using ALS on the training data
    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating")
    model = als.fit(training)

    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))

    # $example off$
    spark.stop()

Spark アプリケーションをデプロイするために spark-submit コマンドを使う。また, INFO レベルのログ出力が多いのでログ出力のレベルを変更した。

$ cp $SPARK_HOME/conf/log4j.properties.template $SPARK_HOME/conf/log4j.properties
$ vim $SPARK_HOME/conf/log4j.properties

log4j.properties 中の `log4j.rootCategory=INFO, console` を `log4j.rootCategory=WARN, console` に変更。

spark-submit を実行。RMSEは 1.84 となった。

$ spark-submit --master yarn spark-cf-als.py
16/11/28 00:15:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
16/11/28 00:16:06 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.
lines = 1501
Root-mean-square error = 1.83956717404

ちなみに, PySpark で Jupyter Notebook を使いたい場合は $SPARK_HOME/conf/spark-env.sh に以下を設定する。

PYSPARK_PYTHON=$PYENV_ROOT/shims/python
PYSPARK_DRIVER_PYTHON=$PYENV_ROOT/shims/jupyter
PYSPARK_DRIVER_PYTHON_OPTS="notebook"

おわりに

参考書籍は『Sparkによる実践データ解析』です。 第3章では ALS を用いた音楽推薦を紹介しており CV によるチューニング例も解説しています。


[1] 推薦システムのアルゴリズム
[2] Matrix Factorization Techniques for Recommender Systems
[3] ml-collaborative-filtering
[4] mllib-collaborative-filtering
[5] Matrix Factorizationとは
[6] Spark API チートシート
[7] How to turn off INFO logging in PySpark?
[8] Evaluation Metrics – Part 1
[9] 非負値行列因子分解(NMF)によるレコメンドのちょっとした例