今回は scikit-learn の Pipeline 内のパラメータに渡す値を実験的に設定ファイル化してみた話です。
sklearn.pipeline.Pipeline
sklearn.pipeline.Pipeline を上手く使うとfit, transform, predict を一つの Python オブジェクトにまとめられコードが簡潔になったり, 手続き型的なコードと比較してミスが減りやすくなる利点がある。
前処理に関する変換を scikit-learn ぽいクラスにして, これらを繋いだ変換パイプラインを作ってみる。パイプラインで使うには最後のステップ以外は次のステップで使う表現を生成する transform() をクラスメソッドに実装する必要がある。
また, 今回はパラメータに渡す値は設定ファイルから読み出す仕組みにする。
# -*- coding: utf-8 -*-
import re
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin
__all__ = ['Select', 'Fill', 'Filter', 'Split', 'Shift', 'Reshape']
class BaseTransformer(BaseEstimator, TransformerMixin):
def __init__(self, save=False):
self.save = save
self._saved_obj = None
def fit(self, X, y=None):
return self
def transform(self, X):
return self
def predict(self, X):
pass
class Select(BaseTransformer):
def __init__(self, cols=[], startswith=[], endswith=[], contains=[], save=False):
self.cols = cols
self.startswith = startswith
self.endswith = endswith
self.contains = contains
super(Select, self).__init__(save)
def transform(self, X):
matched = np.zeros(X.shape[1], dtype=bool)
if len(self.contains) != 0:
for query in self.contains:
matched = matched | X.columns.str.contains(query)
if len(self.startswith) != 0:
for query in self.startswith:
matched = matched | X.columns.str.startswith(query)
if len(self.endswith) != 0:
for query in self.endswith:
matched = matched | X.columns.str.endswith(query)
if len(self.cols) != 0:
for col in self.cols:
matched = matched | X.columns.str.match(re.compile(col))
X = X.loc[:,matched]
if self.save:
self._saved_obj = X.copy()
return X
class Fill(BaseTransformer):
def __init__(self, method='ffill', save=False):
self.method = method
super(Fill, self).__init__(save)
def transform(self, X):
X = X.fillna(method=self.method)
if self.save:
self._saved_obj = X.copy()
return X
class Filter(BaseTransformer):
def __init__(self, expr, save=False):
self.expr = expr # e.g. "X['col'] > 10"
super(Filter, self).__init__(save)
def transform(self, X):
X = X.loc[eval(self.expr)]
if self.save:
self._saved_obj = X.copy()
return X
class Split(BaseTransformer):
def __init__(self, train, val, test, y, exclusion_period=None, dropna=False, save=False):
self.train = train
self.val = val
self.test = test
self.y = y
self.exclusion_period = exclusion_period
self.dropna = dropna
super(Split, self).__init__(save)
def transform(self, X):
train = X.loc[(X.index >= self.train["start"]) & (X.index < self.train["end"])]
val = X.loc[(X.index >= self.val["start"]) & (X.index < self.val["end"])]
test = X.loc[(X.index >= self.test["start"]) & (X.index < self.test["end"])]
if self.dropna:
train = train.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()
val = val.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()
test = test.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()
if self.exclusion_period is not None:
for period in self.exclusion_period:
train = train.loc[~((train.index >= period['start']) & (train.index < period['end']))]
if self.y is None:
X_train = train
X_val = val
X_test = test
y_train = y_val = y_test = None
else:
X_train = train.drop([self.y], axis=1)
y_train = train.loc[:,self.y]
X_val = val.drop([self.y], axis=1)
y_val = val.loc[:,self.y]
X_test = test.drop([self.y], axis=1)
y_test = test.loc[:,self.y]
if self.save:
self._saved_obj = [train.copy(), val.copy(), test.copy()]
return (X_train, y_train, X_val, y_val, X_test, y_test)
class Shift(BaseTransformer):
def __init__(self, periods=1, cols=[], dropna=True, save=False):
self.periods = periods
self.cols = cols
self.dropna = dropna
super(Shift, self).__init__(save)
def transform(self, X):
if len(self.cols) == 0:
targets = X.columns.values
else:
targets = self.cols
for col in targets:
for i in xrange(1,self.periods+1):
name = col + "_lag_" + str(i)
X[name] = X[col].shift(periods=i)
if self.dropna:
X = X.dropna()
if self.save:
self._saved_obj = X.copy()
return X
class Reshape(BaseTransformer):
def __init__(self, batch_size=None, timestep=1, n_features=None, save=False):
self.batch_size = batch_size
self.timestep = timestep
self.n_features = n_features
super(Reshape, self).__init__(save)
def transform(self, X):
if len(X.shape) == 1:
X = X.reshape(-1, 1)
if self.batch_size is None:
batch_size = X.shape[0]
else:
batch_size = self.batch_size
if self.n_features is None:
n_features = X.shape[1]
else:
n_features = self.n_features
X = np.reshape(X, (batch_size, self.timestep, n_features))
if self.save:
self._saved_obj = X.copy()
return X
JSON Configuration File
上記の各クラスのパラメータに渡す値 (引数) は JSON ファイルに全て書いておく。引数は全て設定ファイルから読み出した値を使う。
{
"metadata": {
"title": "configuration file",
"version": "1.0"
},
"pipeline_1": {
"select": {
"startswith": ["Close"]
},
"fill": {
"method": "ffill"
}
},
"pipeline_2": {
"fill": {
"method": "ffill"
},
"shift": {
"periods": 5
},
"select": {
"cols": ["^Close_us_dji$"],
"contains": ["_lag_"]
},
"split": {
"train": {
"start": "2000-01-01 00:00:00",
"end": "2015-01-01 00:00:00"
},
"val": {
"start": "2015-01-01 00:00:00",
"end": "2016-01-01 00:00:00"
},
"test": {
"start": "2016-01-01 00:00:00",
"end": "2017-01-01 00:00:00"
},
"y": "Close_us_dji",
"dropna": true
}
},
"pipeline_3": {
"std_scaler": {},
"reshape": {
"timestep": 1
}
}
}
Usage example
早速, 前処理の変換パイプラインとして使ってみる。
今回は例として, Yahoo Finance から取得したダウ平均, S&P500, DAX, 上海の株価指数を用いる。
$ ipython
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.
IPython 4.2.0 -- An enhanced Interactive Python.
? -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help -> Python's own help system.
object? -> Details about 'object', use 'object??' for extra details.
In [1]: import json
In [2]: import numpy as np
In [3]: import pandas as pd
In [4]: from sklearn.preprocessing import StandardScaler
In [5]: from sklearn.pipeline import Pipeline
In [6]: from transformer import Select, Fill, Filter, Split, Shift, Reshape
In [7]: def load_config(filepath):
...: with open(filepath) as f:
...: return json.load(f)
...:
In [8]: conf = load_config("conf/preprocessing.conf")
各市場のデータを読み込む。最終的には各市場の (t-1, ..., t-5) 時点の株価指数 (終値) を X, (t) 時点のダウ平均 (終値) を y となるように変換する。
In [12]: # ダウ平均
In [13]: us_dji = pd.read_csv("data/^DJI.csv")
In [14]: us_dji.index = pd.to_datetime(us_dji['Date'])
In [15]: us_dji = us_dji.rename(columns=lambda s: s + '_us_dji')
In [16]: # S&P500
In [17]: us_sp = pd.read_csv("data/^GSPC.csv")
In [18]: us_sp.index = pd.to_datetime(us_sp['Date'])
In [19]: us_sp = us_sp.rename(columns=lambda s: s + '_us_sp')
In [20]: # DAX
In [21]: dax = pd.read_csv("data/^GDAXI.csv")
In [22]: dax.index = pd.to_datetime(dax['Date'])
In [23]: dax = dax.rename(columns=lambda s: s + '_dax')
In [24]: # 上海
In [25]: shanghai = pd.read_csv("data/000001.SS.csv")
In [26]: shanghai.index = pd.to_datetime(shanghai['Date'])
In [27]: shanghai = shanghai.rename(columns=lambda s: s + '_shanghai')
最初の Pipeline では各市場の DataFrame から終値の列を選択, 前の観測値で欠測を補完する。各結果の DataFrame を concat して一つの DataFrame にしておく。
In [28]: # Pipeline 1
In [29]: pl1 = Pipeline(zip(["select", "fill"],
....: [Select(**conf["pipeline_1"]["select"]), Fill(**conf["pipeline_1"]["fill"])]))
In [30]: us_dji = pl1.fit_transform(us_dji)
In [31]: us_sp = pl1.fit_transform(us_sp)
In [32]: dax = pl1.fit_transform(dax)
In [33]: shanghai = pl1.fit_transform(shanghai)
In [34]: stock_index = pd.concat([us_dji, us_sp, dax, shanghai], axis=1)
In [35]: stock_index.shape
Out[35]: (7037, 4)
次の Pipeline では後方シフトで lag 特徴量を作り, DataFrame を train, validation, test に分割する。
In [36]: # Pipeline 2
In [37]: pl2 = Pipeline(zip(["fill", "shift", "select", "split"],
....: [Fill(**conf["pipeline_2"]["fill"]), Shift(**conf["pipeline_2"]["shift"]),
....: Select(**conf["pipeline_2"]["select"]), Split(**conf["pipeline_2"]["split"])]))
In [38]: X_train, y_train, X_val, y_val, X_test, y_test = pl2.fit_transform(stock_index)
In [39]: print(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape)
((3776, 20), (3776,), (256, 20), (256,), (259, 20), (259,))
最後の Pipeline で列方向にスケーリングし, Keras/TensorFlow に渡せるような Tensor に Reshape する。
In [40]: # Pipeline 3
In [41]: pl3 = Pipeline(zip(["std_scaler", "respahe"],
....: [StandardScaler(**conf["pipeline_3"]["std_scaler"]), Reshape(**conf["pipeline_3"]["reshape"])]))
In [42]: pl3.fit(X_train)
Out[42]:
Pipeline(memory=None, steps=[('std_scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('respahe', Reshape(batch_size=None, n_features=None, save=False, timestep=1))])
In [43]: X_train = pl3.transform(X_train)
In [44]: X_val = pl3.transform(X_val)
In [45]: X_test = pl3.transform(X_test)
In [46]: pl3.fit(y_train.values.reshape(-1,1))
Out[46]:
Pipeline(memory=None,
steps=[('std_scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('respahe', Reshape(batch_size=None, n_features=None, save=False, timestep=1))])
In [47]: y_train = pl3.transform(y_train.values.reshape(-1,1))
In [48]: y_val = pl3.transform(y_val.values.reshape(-1,1))
In [49]: y_test = pl3.transform(y_test.values.reshape(-1,1))
In [50]: print(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape)
((3776, 1, 20), (3776, 1, 1), (256, 1, 20), (256, 1, 1), (259, 1, 20), (259, 1, 1))
これで訓練の準備ができたので, 分類器や回帰器などを含めた一連の処理を Estimator としてまとめた Pipeline にして GridSearchCV などに渡したりできる。
おわりに
設定ファイルと保存したモデルを関連付けて管理することで再現性の確保にも繋がると思います。試行錯誤でパラメータに渡す値を変更する場合は, なるべくコードの変更は最小限にして設定ファイルを変更するようにできると良さそうです。
[1] Turning user text into live objects
[2] Work like a Pro with Pipelines and Feature Unions
[3] Lessons Learned Reproducing a Deep Reinforcement Learning Paper