今回は, 検証目的で Ubuntu 18.04 (VPS) に Apache Airflow 2.0.0 をインストールした際の個人的な備忘録です。基本的には公式のインストール手順と同様です。
環境は以下です。
- Ubuntu 18.04
- Python 3.7 (Anaconda)
- MySQL 5.7
Apache Airflow はワークフローエンジンで, 以下のコンポーネントで構成される。Airflow のアーキテクチャや各コンポーネントの概念は Basic Airflow architecture が参考になる。
- Metadata Database (Postgres or MySQL)
- Web Server
- Scheduler
- Executor
- Worker(s)
- DAGs (Python code)
Install Apache Airflow
build-essential (GCC/g++ etc.) をインストールする。
$ sudo apt-get update
$ sudo apt-get install build-essential
システムレベルで必要なパッケージをインストールする。
$ sudo apt-get install -y --no-install-recommends \
freetds-bin \
krb5-user \
ldap-utils \
libffi6 \
libsasl2-2 \
libsasl2-modules \
libssl1.1 \
locales \
lsb-release \
sasl2-bin \
sqlite3 \
unixodbc
Airflow の HOME を設定する。.bashrc にも追記する。
$ export AIRFLOW_HOME=/your/project/airflow
Airflow 2.0.0 をインストールする。
$ AIRFLOW_VERSION=2.0.0
$ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ echo $PYTHON_VERSION
3.7
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
$ airflow version
2.0.0
今回は, Airflow の DB バックエンドは MySQL を使用する。DB バックエンドは Postgres と SQLite もサポートしている。 既に MySQL 5.7 をインストール済みの状態。
$ mysql --version
mysql Ver 14.14 Distrib 5.7.32, for Linux (x86_64) using EditLine wrapper
MySQL ドライバは推奨している mysqlclient 又は mysql-connector-python をサポートしている。最初に, mysql-connector-python を試したが, 後述する airflow db init での初期化時に何度かエラーが起きたため結局 mysqlclient をインストールした。[1]
$ pip install mysqlclient
MySQL に airflow データベース と, Airflow がアクセスするための airflow ユーザを作成する。
mysql> CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
Query OK, 1 row affected (0.00 sec)
mysql> CREATE USER 'airflow' IDENTIFIED BY '<PASSWORD>';
Query OK, 0 rows affected (0.02 sec)
mysql> GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
Query OK, 0 rows affected (0.00 sec)
MySQL の接続設定を airflow.cfg に追加する。
$ vim $AIRFLOW_HOME/airflow.cfg
[core]
...
sql_alchemy_conn = mysql+mysqlconnector://airflow:<PASSWORD>@localhost:3306/airflow
...
DB を初期化する。
$ airflow db init
Airflow の Web UI にアクセスするユーザを作成する。
$ airflow users create \
--username admin \
--firstname Peter \
--lastname Parker \
--role Admin \
--email hoge@exmaple.com
インストール時のデフォルトの Executor は 一度に一つのタスクのみを実行する SequentialExecutor である。設定を単一ホスト内で並列に複数の Worker を動かせる LocalExecutor に変更する。(SequentialExecutor は parallelism = 1 の LocalExecutor と同等)
$ vim $AIRFLOW_HOME/airflow.cfg
[core]
...
executor = LocalExecutor
...
Executor の設定を確認する。
$ airflow config get-value core executor
LocalExecutor
Production でスケーラビリティを考慮する場合は CeleryExecutor や CeleryKubernetesExecutor 等が選択肢になりそう。[2]
次に, Airflow の webserver プロセスを起動する。Ubuntu 18.04 は VPS で稼働しているため, 手元の macOS から SSH Port Forwarding で webserver に繋いでブラウザから Web UI を確認した。
$ airflow webserver --port 8080
Airflow の scheduler プロセスをデーモンで起動する。
$ airflow scheduler -D
せっかくなので airflow scheduler プロセスを systemd にサービスとして登録する。scripts/systemd の雛形を参考にして service ファイルを書き systemctl コマンドでサービスを起動する。
$ sudo systemctl enable airflow-scheduler
$ sudo systemctl daemon-reload
$ sudo systemctl start airflow-scheduler.service
$ sudo systemctl status airflow-scheduler.service
● airflow-scheduler.service - Airflow scheduler daemon
Loaded: loaded (/lib/systemd/system/airflow-scheduler.service; enabled; vendor preset: enabled)
Active: active (running) since Fri 2021-01-29 23:22:52 JST; 2s ago
Main PID: 4680 (airflow)
Tasks: 71 (limit: 4658)
CGroup: /system.slice/airflow-scheduler.service
Tutorial
続いて, 公式のチュートリアルを実行する。チュートリアルとして用意されている DAG (tutorial.py) を $AIRFLOW_HOME/dags/ 以下に配置する。
python コマンドで実行して例外が発生しなければ概ね Airflow の DAG として問題ないようだ。
$ python $AIRFLOW_HOME/dags/tutorial.py
DAG として認識されていることを確認する。
$ airflow dags list
dag_id | filepath | owner | paused
=========+=============+=========+=======
tutorial | tutorial.py | airflow | True
DAG をツリー形式で出力し, 各タスクの実行順序を確認する。
$ airflow tasks list tutorial --tree
...
<Task(BashOperator): print_date>
<Task(BashOperator): templated>
<Task(BashOperator): sleep>
次に, DAG に含まれる各タスク (print_date, sleep, templated) を順次テストする。 airflow tasks test ではスケジューラをシミュレートするため, airflow scheduler プロセスが動いていなくても行える。
$ airflow tasks test tutorial print_date 2015-06-01
...
[2021-01-28 23:16:36,004] {bash.py:158} INFO - Running command: date
[2021-01-28 23:16:36,012] {bash.py:169} INFO - Output:
[2021-01-28 23:16:36,018] {bash.py:173} INFO - 2021年 1月28日 木曜日 23時16分36秒 JST
[2021-01-28 23:16:36,019] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:16:36,040] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date=20210128T141635, end_date=20210128T141636
$ airflow tasks test tutorial sleep 2015-06-01
...
[2021-01-28 23:16:50,444] {bash.py:158} INFO - Running command: sleep 5
[2021-01-28 23:16:50,451] {bash.py:169} INFO - Output:
[2021-01-28 23:16:55,459] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:16:55,473] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20150601T000000, start_date=20210128T141650, end_date=20210128T141655
$ airflow tasks test tutorial templated 2015-06-01
...
[2021-01-28 23:19:53,075] {bash.py:158} INFO - Running command:
echo "2015-06-01"
echo "2015-06-08"
echo "Parameter I passed in"
echo "2015-06-01"
echo "2015-06-08"
echo "Parameter I passed in"
echo "2015-06-01"
echo "2015-06-08"
echo "Parameter I passed in"
echo "2015-06-01"
echo "2015-06-08"
echo "Parameter I passed in"
echo "2015-06-01"
echo "2015-06-08"
echo "Parameter I passed in"
[2021-01-28 23:19:53,081] {bash.py:169} INFO - Output:
[2021-01-28 23:19:53,085] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,086] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,086] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:19:53,102] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=templated, execution_date=20150601T000000, start_date=20210128T141952, end_date=20210128T141953
おわりに
Production にデプロイする時の指針は Production Deployment が参考になります。Production では環境依存の不具合軽減や再現性の点から Docker を利用する場合もあると思います。 DockerHub の Apache Airflow を利用できますが, このイメージには最小限の機能のみが含まれるため, 実際は用途に応じてイメージを拡張/カスタマイズすることになります。
また, Airflow や付随するバックエンドDBの運用負担を考えると Amazon MWAA (Amazon Managed Workflows for Apache Airflow) や Cloud Composer などの Managed Airflow も選択肢になりそうです。
[1] Initializing a Database Backend
[2] Executor
[3] Best practices for writing Dockerfiles