【Python】scikit-learn Pipeline と前処理の設定ファイル化

今回は scikit-learn の Pipeline 内のパラメータに渡す値を実験的に設定ファイル化してみた話です。

sklearn.pipeline.Pipeline

sklearn.pipeline.Pipeline を上手く使うとfit, transform, predict を一つの Python オブジェクトにまとめられコードが簡潔になったり, 手続き型的なコードと比較してミスが減りやすくなる利点がある。

前処理に関する変換を scikit-learn ぽいクラスにして, これらを繋いだ変換パイプラインを作ってみる。パイプラインで使うには最後のステップ以外は次のステップで使う表現を生成する transform() をクラスメソッドに実装する必要がある。
また, 今回はパラメータに渡す値は設定ファイルから読み出す仕組みにする。

# -*- coding: utf-8 -*-

import re
import numpy as np
import pandas as pd
from sklearn.base import BaseEstimator, TransformerMixin

__all__ = ['Select', 'Fill', 'Filter', 'Split', 'Shift', 'Reshape']

class BaseTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, save=False):
        self.save = save
        self._saved_obj = None

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        return self

    def predict(self, X):
        pass

class Select(BaseTransformer):
    def __init__(self, cols=[], startswith=[], endswith=[], contains=[], save=False):
        self.cols = cols
        self.startswith = startswith
        self.endswith = endswith
        self.contains = contains
        super(Select, self).__init__(save)

    def transform(self, X):
        matched = np.zeros(X.shape[1], dtype=bool)

        if len(self.contains) != 0:
            for query in self.contains:
                matched = matched | X.columns.str.contains(query)

        if len(self.startswith) != 0:
            for query in self.startswith:
                matched = matched | X.columns.str.startswith(query)

        if len(self.endswith) != 0:
            for query in self.endswith:
                matched = matched | X.columns.str.endswith(query)

        if len(self.cols) != 0:
            for col in self.cols:
                matched = matched | X.columns.str.match(re.compile(col))

        X = X.loc[:,matched]

        if self.save:
            self._saved_obj = X.copy()

        return X

class Fill(BaseTransformer):
    def __init__(self, method='ffill', save=False):
        self.method = method
        super(Fill, self).__init__(save)

    def transform(self, X):
        X = X.fillna(method=self.method)
        if self.save:
            self._saved_obj = X.copy()

        return X

class Filter(BaseTransformer):
    def __init__(self, expr, save=False):
        self.expr = expr # e.g. "X['col'] > 10"
        super(Filter, self).__init__(save)

    def transform(self, X):
        X = X.loc[eval(self.expr)]
        if self.save:
            self._saved_obj = X.copy()

        return X

class Split(BaseTransformer):
    def __init__(self, train, val, test, y, exclusion_period=None, dropna=False, save=False):
        self.train = train
        self.val = val
        self.test = test
        self.y = y
        self.exclusion_period = exclusion_period
        self.dropna = dropna
        super(Split, self).__init__(save)

    def transform(self, X):
        train = X.loc[(X.index >= self.train["start"]) & (X.index < self.train["end"])]
        val = X.loc[(X.index >= self.val["start"]) & (X.index < self.val["end"])]
        test = X.loc[(X.index >= self.test["start"]) & (X.index < self.test["end"])]

        if self.dropna:
            train = train.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()
            val = val.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()
            test = test.apply(lambda x: pd.to_numeric(x, errors="coerce"), axis=1).dropna()

        if self.exclusion_period is not None:
            for period in self.exclusion_period:
                train = train.loc[~((train.index >= period['start']) & (train.index < period['end']))]

        if self.y is None:
            X_train = train
            X_val = val
            X_test = test
            y_train = y_val = y_test = None
        else:
            X_train = train.drop([self.y], axis=1)
            y_train = train.loc[:,self.y]
            X_val = val.drop([self.y], axis=1)
            y_val = val.loc[:,self.y]
            X_test = test.drop([self.y], axis=1)
            y_test = test.loc[:,self.y]

        if self.save:
            self._saved_obj = [train.copy(), val.copy(), test.copy()]

        return (X_train, y_train, X_val, y_val, X_test, y_test)

class Shift(BaseTransformer):
    def __init__(self, periods=1, cols=[], dropna=True, save=False):
        self.periods = periods
        self.cols = cols
        self.dropna = dropna
        super(Shift, self).__init__(save)

    def transform(self, X):

        if len(self.cols) == 0:
            targets = X.columns.values
        else:
            targets = self.cols

        for col in targets:
            for i in xrange(1,self.periods+1):
                name = col + "_lag_" + str(i)
                X[name] = X[col].shift(periods=i)

        if self.dropna:
            X = X.dropna()

        if self.save:
            self._saved_obj = X.copy()

        return X

class Reshape(BaseTransformer):
    def __init__(self, batch_size=None, timestep=1, n_features=None, save=False):
        self.batch_size = batch_size
        self.timestep = timestep
        self.n_features = n_features
        super(Reshape, self).__init__(save)

    def transform(self, X):
        if len(X.shape) == 1:
            X = X.reshape(-1, 1)

        if self.batch_size is None:
            batch_size = X.shape[0]
        else:
            batch_size = self.batch_size

        if self.n_features is None:
            n_features = X.shape[1]
        else:
            n_features = self.n_features

        X = np.reshape(X, (batch_size, self.timestep, n_features))
        if self.save:
            self._saved_obj = X.copy()

        return X

JSON Configuration File

上記の各クラスのパラメータに渡す値 (引数) は JSON ファイルに全て書いておく。引数は全て設定ファイルから読み出した値を使う。

{
	"metadata": {
		"title": "configuration file",
		"version": "1.0"
	},
	"pipeline_1": {
		"select": {
			"startswith": ["Close"]
		},
		"fill": {
			"method": "ffill"
		}
	},
	"pipeline_2": {
		"fill": {
			"method": "ffill"
		},
		"shift": {
			"periods": 5
		},
		"select": {
			"cols": ["^Close_us_dji$"],
			"contains": ["_lag_"]
		},
		"split": {
			"train": {
				"start": "2000-01-01 00:00:00",
				"end": "2015-01-01 00:00:00"
			},
			"val": {
				"start": "2015-01-01 00:00:00",
				"end": "2016-01-01 00:00:00"
			},
			"test": {
				"start": "2016-01-01 00:00:00",
				"end": "2017-01-01 00:00:00"
			},
			"y": "Close_us_dji",
			"dropna": true
		}
	},
	"pipeline_3": {
		"std_scaler": {},
		"reshape": {
			"timestep": 1
		}
	}
}

Usage example

早速, 前処理の変換パイプラインとして使ってみる。
今回は例として, Yahoo Finance から取得したダウ平均, S&P500, DAX, 上海の株価指数を用いる。

$ ipython
Python 2.7.11 |Anaconda custom (x86_64)| (default, Jun 15 2016, 16:09:16)
Type "copyright", "credits" or "license" for more information.

IPython 4.2.0 -- An enhanced Interactive Python.
?         -> Introduction and overview of IPython's features.
%quickref -> Quick reference.
help      -> Python's own help system.
object?   -> Details about 'object', use 'object??' for extra details.

In [1]: import json

In [2]: import numpy as np

In [3]: import pandas as pd

In [4]: from sklearn.preprocessing import StandardScaler

In [5]: from sklearn.pipeline import Pipeline

In [6]: from transformer import Select, Fill, Filter, Split, Shift, Reshape

In [7]: def load_config(filepath):
   ...:     with open(filepath) as f:
   ...:          return json.load(f)
   ...:

In [8]: conf = load_config("conf/preprocessing.conf")

各市場のデータを読み込む。最終的には各市場の (t-1, ..., t-5) 時点の株価指数 (終値) を X, (t) 時点のダウ平均 (終値) を y となるように変換する。

In [12]: # ダウ平均

In [13]: us_dji = pd.read_csv("data/^DJI.csv")

In [14]: us_dji.index = pd.to_datetime(us_dji['Date'])

In [15]: us_dji = us_dji.rename(columns=lambda s: s + '_us_dji')

In [16]: # S&P500

In [17]: us_sp = pd.read_csv("data/^GSPC.csv")

In [18]: us_sp.index = pd.to_datetime(us_sp['Date'])

In [19]: us_sp = us_sp.rename(columns=lambda s: s + '_us_sp')

In [20]: # DAX

In [21]: dax = pd.read_csv("data/^GDAXI.csv")

In [22]: dax.index = pd.to_datetime(dax['Date'])

In [23]: dax = dax.rename(columns=lambda s: s + '_dax')

In [24]: # 上海

In [25]: shanghai = pd.read_csv("data/000001.SS.csv")

In [26]: shanghai.index = pd.to_datetime(shanghai['Date'])

In [27]: shanghai = shanghai.rename(columns=lambda s: s + '_shanghai')

最初の Pipeline では各市場の DataFrame から終値の列を選択, 前の観測値で欠測を補完する。各結果の DataFrame を concat して一つの DataFrame にしておく。

In [28]: # Pipeline 1

In [29]: pl1 = Pipeline(zip(["select", "fill"],
   ....:         [Select(**conf["pipeline_1"]["select"]), Fill(**conf["pipeline_1"]["fill"])]))

In [30]: us_dji = pl1.fit_transform(us_dji)

In [31]: us_sp = pl1.fit_transform(us_sp)

In [32]: dax = pl1.fit_transform(dax)

In [33]: shanghai = pl1.fit_transform(shanghai)

In [34]: stock_index = pd.concat([us_dji, us_sp, dax, shanghai], axis=1)

In [35]: stock_index.shape
Out[35]: (7037, 4)

次の Pipeline では後方シフトで lag 特徴量を作り, DataFrame を train, validation, test に分割する。

In [36]: # Pipeline 2

In [37]: pl2 = Pipeline(zip(["fill", "shift", "select", "split"],
   ....:         [Fill(**conf["pipeline_2"]["fill"]), Shift(**conf["pipeline_2"]["shift"]),
   ....:         Select(**conf["pipeline_2"]["select"]), Split(**conf["pipeline_2"]["split"])]))

In [38]: X_train, y_train, X_val, y_val, X_test, y_test = pl2.fit_transform(stock_index)

In [39]: print(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape)
((3776, 20), (3776,), (256, 20), (256,), (259, 20), (259,))

最後の Pipeline で列方向にスケーリングし, Keras/TensorFlow に渡せるような Tensor に Reshape する。

In [40]: # Pipeline 3

In [41]: pl3 = Pipeline(zip(["std_scaler", "respahe"],
   ....:         [StandardScaler(**conf["pipeline_3"]["std_scaler"]), Reshape(**conf["pipeline_3"]["reshape"])]))

In [42]: pl3.fit(X_train)
Out[42]:
Pipeline(memory=None, steps=[('std_scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('respahe', Reshape(batch_size=None, n_features=None, save=False, timestep=1))])

In [43]: X_train = pl3.transform(X_train)

In [44]: X_val = pl3.transform(X_val)

In [45]: X_test = pl3.transform(X_test)

In [46]: pl3.fit(y_train.values.reshape(-1,1))
Out[46]:
Pipeline(memory=None,
     steps=[('std_scaler', StandardScaler(copy=True, with_mean=True, with_std=True)), ('respahe', Reshape(batch_size=None, n_features=None, save=False, timestep=1))])

In [47]: y_train = pl3.transform(y_train.values.reshape(-1,1))

In [48]: y_val = pl3.transform(y_val.values.reshape(-1,1))

In [49]: y_test = pl3.transform(y_test.values.reshape(-1,1))

In [50]: print(X_train.shape, y_train.shape, X_val.shape, y_val.shape, X_test.shape, y_test.shape)
((3776, 1, 20), (3776, 1, 1), (256, 1, 20), (256, 1, 1), (259, 1, 20), (259, 1, 1))

これで訓練の準備ができたので, 分類器や回帰器などを含めた一連の処理を Estimator としてまとめた Pipeline にして GridSearchCV などに渡したりできる。

おわりに

設定ファイルと保存したモデルを関連付けて管理することで再現性の確保にも繋がると思います。試行錯誤でパラメータに渡す値を変更する場合は, なるべくコードの変更は最小限にして設定ファイルを変更するようにできると良さそうです。

[1] Turning user text into live objects
[2] Work like a Pro with Pipelines and Feature Unions
[3] Lessons Learned Reproducing a Deep Reinforcement Learning Paper