【PySpark】Koalas の紹介

KoalasApache Spark 上に pandas DataFrame API を実装し, データサイエンティストのビッグデータに対する生産性向上を目的としたプロジェクトです。 pandas (tests, smaller datasets) と Spark (distributed datasets) の両方で動く単一のコードベースとできることを目指しているとあります。

Koalas プロジェクトは β 段階でリリースが早い (weekly release) ため最新の API は Docs を確認すると良いと思います。執筆時点でのバージョンは 0.5.0 です。

環境は以下です。

  • Ubuntu 16.04 LTS
  • OpenJDK 1.8.0_212
  • Scala 2.11.6
  • Python 3.7.3
  • Spark 2.4.3

Get Started

Spark 2.4, Python3.5+ がインストールされている状態で,  conda から Koalas (0.50) をインストールする。また, pip からもインストールできる。

$ conda install koalas -c conda-forge

PySpark を起動し Koalas を import する。

$ PYSPARK_DRIVER_PYTHON="/root/.pyenv/shims/ipython" pyspark
Python 3.7.3 (default, Mar 27 2019, 22:11:17)
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.3
      /_/

Using Python version 3.7.3 (default, Mar 27 2019 22:11:17)
SparkSession available as 'spark'.

In [1]: import databricks.koalas as ks

In [2]: import pandas as pd

In [3]: pdf = pd.DataFrame({'x':range(3), 'y':['a','b','b'], 'z':['a','b','b']})

In [4]: type(pdf)
Out[4]: pandas.core.frame.DataFrame

In [5]: pdf
Out[5]:
   x  y  z
0  0  a  a
1  1  b  b
2  2  b  b

from_pandas() に pandas の DataFrame を渡すと Koalas の DataFrame が返る。

In [6]: df = ks.from_pandas(pdf)

In [7]: type(df)
Out[7]: databricks.koalas.frame.DataFrame

In [8]: df

   x  y  z
0  0  a  a
1  1  b  b
2  2  b  b

In [9]: df.columns = ['x', 'y', 'z1']

In [10]: df['x2'] = df.x * df.x

In [11]: df
Out[11]:
   x  y z1  x2
0  0  a  a   0
1  1  b  b   1
2  2  b  b   4

Use Cases

databricks のブログに投稿された Koalas: Easy Transition from pandas to Apache Spark では Koalas のユースケースとして, カテゴリカル変数の特徴エンジニアリングとタイムスタンプの演算が紹介されている。

PySpark でのタイムスタンプ演算は少々大変であるが, pandas では比較的容易である。

In [12]: import numpy as np

In [13]: date1 = pd.Series(pd.date_range('2012-1-1 12:00:00', periods=7, freq='M'))

In [14]: date2 = pd.Series(pd.date_range('2013-3-11 21:45:00', periods=7, freq='W'))

In [15]: df = pd.DataFrame(dict(Start_date = date1, End_date = date2))

In [16]: df
Out[16]:
           Start_date            End_date
0 2012-01-31 12:00:00 2013-03-17 21:45:00
1 2012-02-29 12:00:00 2013-03-24 21:45:00
2 2012-03-31 12:00:00 2013-03-31 21:45:00
3 2012-04-30 12:00:00 2013-04-07 21:45:00
4 2012-05-31 12:00:00 2013-04-14 21:45:00
5 2012-06-30 12:00:00 2013-04-21 21:45:00
6 2012-07-31 12:00:00 2013-04-28 21:45:00

In [17]: df['diff_seconds'] = df['End_date'] - df['Start_date']
    ...: df['diff_seconds'] = df['diff_seconds'] / np.timedelta64(1, 's')
    ...:
    ...:

In [18]: df
Out[18]:
           Start_date            End_date  diff_seconds
0 2012-01-31 12:00:00 2013-03-17 21:45:00    35545500.0
1 2012-02-29 12:00:00 2013-03-24 21:45:00    33644700.0
2 2012-03-31 12:00:00 2013-03-31 21:45:00    31571100.0
3 2012-04-30 12:00:00 2013-04-07 21:45:00    29583900.0
4 2012-05-31 12:00:00 2013-04-14 21:45:00    27510300.0
5 2012-06-30 12:00:00 2013-04-21 21:45:00    25523100.0
6 2012-07-31 12:00:00 2013-04-28 21:45:00    23449500.0

上記のような pandas のタイムスタンプ演算を Koalas でも同様に行える。

In [19]: df = pd.DataFrame(dict(Start_date = date1, End_date = date2))

In [20]: kdf = ks.from_pandas(df)

In [21]: kdf['diff_seconds'] = kdf['End_date'] - kdf['Start_date']
    ...: kdf['diff_seconds'] = kdf['diff_seconds'] / np.timedelta64(1, 's')
    ...:
    ...:

In [22]: kdf
Out[22]:
           Start_date            End_date  diff_seconds
0 2012-01-31 12:00:00 2013-03-17 21:45:00    35545500.0
1 2012-02-29 12:00:00 2013-03-24 21:45:00    33644700.0
2 2012-03-31 12:00:00 2013-03-31 21:45:00    31571100.0
3 2012-04-30 12:00:00 2013-04-07 21:45:00    29583900.0
4 2012-05-31 12:00:00 2013-04-14 21:45:00    27510300.0
5 2012-06-30 12:00:00 2013-04-21 21:45:00    25523100.0
6 2012-07-31 12:00:00 2013-04-28 21:45:00    23449500.0

to_spark() で Koalas DataFrame を Spark DataFrame に変換する。

In [23]: sdf = kdf.to_spark()

In [24]: sdf
Out[24]: DataFrame[__index_level_0__: bigint, Start_date: timestamp, End_date: timestamp, diff_seconds: double]

In [25]: type(sdf)
Out[25]: pyspark.sql.dataframe.DataFrame

In [26]: sdf.show()
+-----------------+-------------------+-------------------+------------+
|__index_level_0__|         Start_date|           End_date|diff_seconds|
+-----------------+-------------------+-------------------+------------+
|                0|2012-01-31 12:00:00|2013-03-17 21:45:00|   3.55455E7|
|                1|2012-02-29 12:00:00|2013-03-24 21:45:00|   3.36447E7|
|                2|2012-03-31 12:00:00|2013-03-31 21:45:00|   3.15711E7|
|                3|2012-04-30 12:00:00|2013-04-07 21:45:00|   2.95839E7|
|                4|2012-05-31 12:00:00|2013-04-14 21:45:00|   2.75103E7|
|                5|2012-06-30 12:00:00|2013-04-21 21:45:00|   2.55231E7|
|                6|2012-07-31 12:00:00|2013-04-28 21:45:00|   2.34495E7|
+-----------------+-------------------+-------------------+------------+

to_koalas() で Spark DataFrame を Koalas DataFrame に戻すことができる。

In [27]: kdf2 = sdf.to_koalas()

In [28]: type(kdf2)
Out[28]: databricks.koalas.frame.DataFrame

In [29]: kdf2
Out[29]:
   __index_level_0__          Start_date            End_date  diff_seconds
0                  0 2012-01-31 12:00:00 2013-03-17 21:45:00    35545500.0
1                  1 2012-02-29 12:00:00 2013-03-24 21:45:00    33644700.0
2                  2 2012-03-31 12:00:00 2013-03-31 21:45:00    31571100.0
3                  3 2012-04-30 12:00:00 2013-04-07 21:45:00    29583900.0
4                  4 2012-05-31 12:00:00 2013-04-14 21:45:00    27510300.0
5                  5 2012-06-30 12:00:00 2013-04-21 21:45:00    25523100.0
6                  6 2012-07-31 12:00:00 2013-04-28 21:45:00    23449500.0

今後のロードマップでは pandas の Text Data, Time Series のカバレッジを上げていくことを目指していく方針とある。

Spark MLlib

データ分析の初期段階では pandas / Koalas でデータを読み込み, 探索的な分析を行い  Spark MLlib でモデリングを行う使い方が考えられる。

pandas と Koalas の使い分けについては以下の原則が示されている。

The principle is: if the returned object can be large, use a Koalas DataFrame/Series. If the data is bound to be small, use a pandas DataFrame/Series.

原則として, 返されるオブジェクトが大きくなる可能性がある場合は, Koalas DataFrame / Series を使用するとある。例えば, DataFrame.dtypes は通常 DataFrame の列数は行数に比べて少ないため pandas Series で十分であるが DataFrame.head() や Series.unique() はオブジェクトが大きくなる可能性があるため Koalas DataFrame / Series を使う。

ここでは Iris を pandas DataFrame で読み込み, Koalas を使い Spark DataFrame に変換し MLlib の Random forest classifier で分類モデルを作ってみる。

In [30]: iris = pd.read_csv("iris.csv")

In [31]: iris.head()
Out[31]:
     sepal.length  sepal.width  petal.length  petal.width    species
0             5.1          3.5           1.4          0.2     Setosa
1             4.9          3.0           1.4          0.2     Setosa
2             4.7          3.2           1.3          0.2     Setosa
3             4.6          3.1           1.5          0.2     Setosa
4             5.0          3.6           1.4          0.2     Setosa

In [32]: iris = iris.rename(columns={"sepal.length": "sepal_length", "sepal.width": "sepal_width", "pe
    ...: tal.length": "petal_length", "petal.width": "petal_width"})

In [33]: iris.head()
Out[33]:
   sepal_length  sepal_width  petal_length  petal_width species
0           5.1          3.5           1.4          0.2  Setosa
1           4.9          3.0           1.4          0.2  Setosa
2           4.7          3.2           1.3          0.2  Setosa
3           4.6          3.1           1.5          0.2  Setosa
4           5.0          3.6           1.4          0.2  Setosa

Koalas を使い ks.from_pandas().to_spark() で pandas DataFrame を Spark DataFrame に変換する。

In [34]: sdf = ks.from_pandas(iris).to_spark()

In [35]: sdf
Out[35]: DataFrame[__index_level_0__: bigint, sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]

Spark MLlib でモデリングを行う。

In [36]: from pyspark.ml import Pipeline
    ...: from pyspark.ml.classification import RandomForestClassifier
    ...: from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
    ...: from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    ...:
    ...:

In [37]: labelIndexer = StringIndexer(inputCol="species", outputCol="indexedLabel").fit(sdf)

In [38]: assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_
    ...: width"], outputCol='features')

In [39]: sdf = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"
    ...: ], outputCol='features').transform(sdf)

In [40]: featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(sdf)

In [41]: sdf.show(5)
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
|__index_level_0__|sepal_length|sepal_width|petal_length|petal_width|species|         features|
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
|                0|         5.1|        3.5|         1.4|        0.2| Setosa|[5.1,3.5,1.4,0.2]|
|                1|         4.9|        3.0|         1.4|        0.2| Setosa|[4.9,3.0,1.4,0.2]|
|                2|         4.7|        3.2|         1.3|        0.2| Setosa|[4.7,3.2,1.3,0.2]|
|                3|         4.6|        3.1|         1.5|        0.2| Setosa|[4.6,3.1,1.5,0.2]|
|                4|         5.0|        3.6|         1.4|        0.2| Setosa|[5.0,3.6,1.4,0.2]|
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
only showing top 5 rows

In [42]: (trainingData, testData) = sdf.randomSplit([0.7, 0.3])

In [43]: rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=
    ...: 10)

In [44]: labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labe
    ...: lIndexer.labels)

In [45]: pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])

In [46]: model = pipeline.fit(trainingData)

In [47]: predictions = model.transform(testData)

In [48]: predictions.select("predictedLabel", "species", "features").show(5)
+--------------+-------+-----------------+
|predictedLabel|species|         features|
+--------------+-------+-----------------+
|        Setosa| Setosa|[4.7,3.2,1.3,0.2]|
|        Setosa| Setosa|[4.6,3.1,1.5,0.2]|
|        Setosa| Setosa|[4.6,3.4,1.4,0.3]|
|        Setosa| Setosa|[4.8,3.0,1.4,0.1]|
|        Setosa| Setosa|[5.4,3.9,1.3,0.4]|
+--------------+-------+-----------------+
only showing top 5 rows

In [49]: evaluator = MulticlassClassificationEvaluator(
    ...:     labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    ...:

In [50]: accuracy = evaluator.evaluate(predictions)

In [51]: print("Test Error = %g" % (1.0 - accuracy))
Test Error = 0.0434783

[1] Download Apache Spark