Koalas は Apache Spark 上に pandas DataFrame API を実装し, データサイエンティストのビッグデータに対する生産性向上を目的としたプロジェクトです。 pandas (tests, smaller datasets) と Spark (distributed datasets) の両方で動く単一のコードベースとできることを目指しているとあります。
Koalas プロジェクトは β 段階でリリースが早い (weekly release) ため最新の API は Docs を確認すると良いと思います。執筆時点でのバージョンは 0.5.0 です。
環境は以下です。
- Ubuntu 16.04 LTS
- OpenJDK 1.8.0_212
- Scala 2.11.6
- Python 3.7.3
- Spark 2.4.3
Get Started
Spark 2.4, Python3.5+ がインストールされている状態で, conda から Koalas (0.50) をインストールする。また, pip からもインストールできる。
$ conda install koalas -c conda-forge
PySpark を起動し Koalas を import する。
$ PYSPARK_DRIVER_PYTHON="/root/.pyenv/shims/ipython" pyspark
Python 3.7.3 (default, Mar 27 2019, 22:11:17)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/__ / .__/\_,_/_/ /_/\_\ version 2.4.3
/_/
Using Python version 3.7.3 (default, Mar 27 2019 22:11:17)
SparkSession available as 'spark'.
In [1]: import databricks.koalas as ks
In [2]: import pandas as pd
In [3]: pdf = pd.DataFrame({'x':range(3), 'y':['a','b','b'], 'z':['a','b','b']})
In [4]: type(pdf)
Out[4]: pandas.core.frame.DataFrame
In [5]: pdf
Out[5]:
x y z
0 0 a a
1 1 b b
2 2 b b
from_pandas() に pandas の DataFrame を渡すと Koalas の DataFrame が返る。
In [6]: df = ks.from_pandas(pdf)
In [7]: type(df)
Out[7]: databricks.koalas.frame.DataFrame
In [8]: df
x y z
0 0 a a
1 1 b b
2 2 b b
In [9]: df.columns = ['x', 'y', 'z1']
In [10]: df['x2'] = df.x * df.x
In [11]: df
Out[11]:
x y z1 x2
0 0 a a 0
1 1 b b 1
2 2 b b 4
Use Cases
databricks のブログに投稿された Koalas: Easy Transition from pandas to Apache Spark では Koalas のユースケースとして, カテゴリカル変数の特徴エンジニアリングとタイムスタンプの演算が紹介されている。
PySpark でのタイムスタンプ演算は少々大変であるが, pandas では比較的容易である。
In [12]: import numpy as np
In [13]: date1 = pd.Series(pd.date_range('2012-1-1 12:00:00', periods=7, freq='M'))
In [14]: date2 = pd.Series(pd.date_range('2013-3-11 21:45:00', periods=7, freq='W'))
In [15]: df = pd.DataFrame(dict(Start_date = date1, End_date = date2))
In [16]: df
Out[16]:
Start_date End_date
0 2012-01-31 12:00:00 2013-03-17 21:45:00
1 2012-02-29 12:00:00 2013-03-24 21:45:00
2 2012-03-31 12:00:00 2013-03-31 21:45:00
3 2012-04-30 12:00:00 2013-04-07 21:45:00
4 2012-05-31 12:00:00 2013-04-14 21:45:00
5 2012-06-30 12:00:00 2013-04-21 21:45:00
6 2012-07-31 12:00:00 2013-04-28 21:45:00
In [17]: df['diff_seconds'] = df['End_date'] - df['Start_date']
...: df['diff_seconds'] = df['diff_seconds'] / np.timedelta64(1, 's')
...:
...:
In [18]: df
Out[18]:
Start_date End_date diff_seconds
0 2012-01-31 12:00:00 2013-03-17 21:45:00 35545500.0
1 2012-02-29 12:00:00 2013-03-24 21:45:00 33644700.0
2 2012-03-31 12:00:00 2013-03-31 21:45:00 31571100.0
3 2012-04-30 12:00:00 2013-04-07 21:45:00 29583900.0
4 2012-05-31 12:00:00 2013-04-14 21:45:00 27510300.0
5 2012-06-30 12:00:00 2013-04-21 21:45:00 25523100.0
6 2012-07-31 12:00:00 2013-04-28 21:45:00 23449500.0
上記のような pandas のタイムスタンプ演算を Koalas でも同様に行える。
In [19]: df = pd.DataFrame(dict(Start_date = date1, End_date = date2))
In [20]: kdf = ks.from_pandas(df)
In [21]: kdf['diff_seconds'] = kdf['End_date'] - kdf['Start_date']
...: kdf['diff_seconds'] = kdf['diff_seconds'] / np.timedelta64(1, 's')
...:
...:
In [22]: kdf
Out[22]:
Start_date End_date diff_seconds
0 2012-01-31 12:00:00 2013-03-17 21:45:00 35545500.0
1 2012-02-29 12:00:00 2013-03-24 21:45:00 33644700.0
2 2012-03-31 12:00:00 2013-03-31 21:45:00 31571100.0
3 2012-04-30 12:00:00 2013-04-07 21:45:00 29583900.0
4 2012-05-31 12:00:00 2013-04-14 21:45:00 27510300.0
5 2012-06-30 12:00:00 2013-04-21 21:45:00 25523100.0
6 2012-07-31 12:00:00 2013-04-28 21:45:00 23449500.0
to_spark() で Koalas DataFrame を Spark DataFrame に変換する。
In [23]: sdf = kdf.to_spark()
In [24]: sdf
Out[24]: DataFrame[__index_level_0__: bigint, Start_date: timestamp, End_date: timestamp, diff_seconds: double]
In [25]: type(sdf)
Out[25]: pyspark.sql.dataframe.DataFrame
In [26]: sdf.show()
+-----------------+-------------------+-------------------+------------+
|__index_level_0__| Start_date| End_date|diff_seconds|
+-----------------+-------------------+-------------------+------------+
| 0|2012-01-31 12:00:00|2013-03-17 21:45:00| 3.55455E7|
| 1|2012-02-29 12:00:00|2013-03-24 21:45:00| 3.36447E7|
| 2|2012-03-31 12:00:00|2013-03-31 21:45:00| 3.15711E7|
| 3|2012-04-30 12:00:00|2013-04-07 21:45:00| 2.95839E7|
| 4|2012-05-31 12:00:00|2013-04-14 21:45:00| 2.75103E7|
| 5|2012-06-30 12:00:00|2013-04-21 21:45:00| 2.55231E7|
| 6|2012-07-31 12:00:00|2013-04-28 21:45:00| 2.34495E7|
+-----------------+-------------------+-------------------+------------+
to_koalas() で Spark DataFrame を Koalas DataFrame に戻すことができる。
In [27]: kdf2 = sdf.to_koalas()
In [28]: type(kdf2)
Out[28]: databricks.koalas.frame.DataFrame
In [29]: kdf2
Out[29]:
__index_level_0__ Start_date End_date diff_seconds
0 0 2012-01-31 12:00:00 2013-03-17 21:45:00 35545500.0
1 1 2012-02-29 12:00:00 2013-03-24 21:45:00 33644700.0
2 2 2012-03-31 12:00:00 2013-03-31 21:45:00 31571100.0
3 3 2012-04-30 12:00:00 2013-04-07 21:45:00 29583900.0
4 4 2012-05-31 12:00:00 2013-04-14 21:45:00 27510300.0
5 5 2012-06-30 12:00:00 2013-04-21 21:45:00 25523100.0
6 6 2012-07-31 12:00:00 2013-04-28 21:45:00 23449500.0
今後のロードマップでは pandas の Text Data, Time Series のカバレッジを上げていくことを目指していく方針とある。
Spark MLlib
データ分析の初期段階では pandas / Koalas でデータを読み込み, 探索的な分析を行い Spark MLlib でモデリングを行う使い方が考えられる。
pandas と Koalas の使い分けについては以下の原則が示されている。
The principle is: if the returned object can be large, use a Koalas DataFrame/Series. If the data is bound to be small, use a pandas DataFrame/Series.
原則として, 返されるオブジェクトが大きくなる可能性がある場合は, Koalas DataFrame / Series を使用するとある。例えば, DataFrame.dtypes は通常 DataFrame の列数は行数に比べて少ないため pandas Series で十分であるが DataFrame.head() や Series.unique() はオブジェクトが大きくなる可能性があるため Koalas DataFrame / Series を使う。
ここでは Iris を pandas DataFrame で読み込み, Koalas を使い Spark DataFrame に変換し MLlib の Random forest classifier で分類モデルを作ってみる。
In [30]: iris = pd.read_csv("iris.csv")
In [31]: iris.head()
Out[31]:
sepal.length sepal.width petal.length petal.width species
0 5.1 3.5 1.4 0.2 Setosa
1 4.9 3.0 1.4 0.2 Setosa
2 4.7 3.2 1.3 0.2 Setosa
3 4.6 3.1 1.5 0.2 Setosa
4 5.0 3.6 1.4 0.2 Setosa
In [32]: iris = iris.rename(columns={"sepal.length": "sepal_length", "sepal.width": "sepal_width", "pe
...: tal.length": "petal_length", "petal.width": "petal_width"})
In [33]: iris.head()
Out[33]:
sepal_length sepal_width petal_length petal_width species
0 5.1 3.5 1.4 0.2 Setosa
1 4.9 3.0 1.4 0.2 Setosa
2 4.7 3.2 1.3 0.2 Setosa
3 4.6 3.1 1.5 0.2 Setosa
4 5.0 3.6 1.4 0.2 Setosa
Koalas を使い ks.from_pandas().to_spark() で pandas DataFrame を Spark DataFrame に変換する。
In [34]: sdf = ks.from_pandas(iris).to_spark()
In [35]: sdf
Out[35]: DataFrame[__index_level_0__: bigint, sepal_length: double, sepal_width: double, petal_length: double, petal_width: double, species: string]
Spark MLlib でモデリングを行う。
In [36]: from pyspark.ml import Pipeline
...: from pyspark.ml.classification import RandomForestClassifier
...: from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer, VectorAssembler
...: from pyspark.ml.evaluation import MulticlassClassificationEvaluator
...:
...:
In [37]: labelIndexer = StringIndexer(inputCol="species", outputCol="indexedLabel").fit(sdf)
In [38]: assembler = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_
...: width"], outputCol='features')
In [39]: sdf = VectorAssembler(inputCols=["sepal_length", "sepal_width", "petal_length", "petal_width"
...: ], outputCol='features').transform(sdf)
In [40]: featureIndexer = VectorIndexer(inputCol="features", outputCol="indexedFeatures").fit(sdf)
In [41]: sdf.show(5)
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
|__index_level_0__|sepal_length|sepal_width|petal_length|petal_width|species| features|
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
| 0| 5.1| 3.5| 1.4| 0.2| Setosa|[5.1,3.5,1.4,0.2]|
| 1| 4.9| 3.0| 1.4| 0.2| Setosa|[4.9,3.0,1.4,0.2]|
| 2| 4.7| 3.2| 1.3| 0.2| Setosa|[4.7,3.2,1.3,0.2]|
| 3| 4.6| 3.1| 1.5| 0.2| Setosa|[4.6,3.1,1.5,0.2]|
| 4| 5.0| 3.6| 1.4| 0.2| Setosa|[5.0,3.6,1.4,0.2]|
+-----------------+------------+-----------+------------+-----------+-------+-----------------+
only showing top 5 rows
In [42]: (trainingData, testData) = sdf.randomSplit([0.7, 0.3])
In [43]: rf = RandomForestClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures", numTrees=
...: 10)
In [44]: labelConverter = IndexToString(inputCol="prediction", outputCol="predictedLabel", labels=labe
...: lIndexer.labels)
In [45]: pipeline = Pipeline(stages=[labelIndexer, featureIndexer, rf, labelConverter])
In [46]: model = pipeline.fit(trainingData)
In [47]: predictions = model.transform(testData)
In [48]: predictions.select("predictedLabel", "species", "features").show(5)
+--------------+-------+-----------------+
|predictedLabel|species| features|
+--------------+-------+-----------------+
| Setosa| Setosa|[4.7,3.2,1.3,0.2]|
| Setosa| Setosa|[4.6,3.1,1.5,0.2]|
| Setosa| Setosa|[4.6,3.4,1.4,0.3]|
| Setosa| Setosa|[4.8,3.0,1.4,0.1]|
| Setosa| Setosa|[5.4,3.9,1.3,0.4]|
+--------------+-------+-----------------+
only showing top 5 rows
In [49]: evaluator = MulticlassClassificationEvaluator(
...: labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
...:
In [50]: accuracy = evaluator.evaluate(predictions)
In [51]: print("Test Error = %g" % (1.0 - accuracy))
Test Error = 0.0434783