【Python】Apache Airflow 2.0.0 を Ubuntu 18.04 にインストールする

今回は, 検証目的で Ubuntu 18.04 (VPS) に Apache Airflow 2.0.0 をインストールした際の個人的な備忘録です。基本的には公式のインストール手順と同様です。

環境は以下です。

  • Ubuntu 18.04
  • Python 3.7 (Anaconda)
  • MySQL 5.7

Apache Airflow はワークフローエンジンで, 以下のコンポーネントで構成される。Airflow のアーキテクチャや各コンポーネントの概念は Basic Airflow architecture が参考になる。

  • Metadata Database (Postgres or MySQL)
  • Web Server
  • Scheduler
  • Executor
  • Worker(s)
  • DAGs (Python code)

Install Apache Airflow

build-essential (GCC/g++ etc.) をインストールする。

$ sudo apt-get update
$ sudo apt-get install build-essential

システムレベルで必要なパッケージをインストールする。

$ sudo apt-get install -y --no-install-recommends \
        freetds-bin \
        krb5-user \
        ldap-utils \
        libffi6 \
        libsasl2-2 \
        libsasl2-modules \
        libssl1.1 \
        locales  \
        lsb-release \
        sasl2-bin \
        sqlite3 \
        unixodbc

Airflow の HOME を設定する。.bashrc にも追記する。

$ export AIRFLOW_HOME=/your/project/airflow

Airflow 2.0.0 をインストールする。

$ AIRFLOW_VERSION=2.0.0
$ PYTHON_VERSION="$(python --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
$ echo $PYTHON_VERSION
3.7
$ CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
$ pip install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
$ airflow version
2.0.0

今回は, Airflow の DB バックエンドは MySQL を使用する。DB バックエンドは Postgres と SQLite もサポートしている。 既に MySQL 5.7 をインストール済みの状態。

$ mysql --version
mysql  Ver 14.14 Distrib 5.7.32, for Linux (x86_64) using  EditLine wrapper

MySQL ドライバは推奨している mysqlclient 又は mysql-connector-python をサポートしている。最初に, mysql-connector-python を試したが, 後述する airflow db init での初期化時に何度かエラーが起きたため結局 mysqlclient をインストールした。[1]

$ pip install mysqlclient

MySQL に airflow データベース と, Airflow がアクセスするための airflow ユーザを作成する。

mysql> CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
Query OK, 1 row affected (0.00 sec)
mysql> CREATE USER 'airflow' IDENTIFIED BY '<PASSWORD>';
Query OK, 0 rows affected (0.02 sec)
mysql> GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
Query OK, 0 rows affected (0.00 sec)

MySQL の接続設定を airflow.cfg に追加する。

$ vim $AIRFLOW_HOME/airflow.cfg
[core]
...
sql_alchemy_conn = mysql+mysqlconnector://airflow:<PASSWORD>@localhost:3306/airflow
...

DB を初期化する。

$ airflow db init

Airflow の Web UI にアクセスするユーザを作成する。

$ airflow users create \
    --username admin \
    --firstname Peter \
    --lastname Parker \
    --role Admin \
    --email hoge@exmaple.com

インストール時のデフォルトの Executor は 一度に一つのタスクのみを実行する SequentialExecutor である。設定を単一ホスト内で並列に複数の Worker を動かせる LocalExecutor に変更する。(SequentialExecutor は parallelism = 1 の LocalExecutor と同等)

$ vim $AIRFLOW_HOME/airflow.cfg
[core]
...
executor = LocalExecutor
...

Executor の設定を確認する。

$ airflow config get-value core executor
LocalExecutor

Production でスケーラビリティを考慮する場合は CeleryExecutor や CeleryKubernetesExecutor 等が選択肢になりそう。[2]

次に, Airflow の webserver プロセスを起動する。Ubuntu 18.04 は VPS で稼働しているため, 手元の macOS から SSH Port Forwarding で webserver に繋いでブラウザから Web UI を確認した。

$ airflow webserver --port 8080

Airflow の scheduler プロセスをデーモンで起動する。

$ airflow scheduler -D

せっかくなので airflow scheduler プロセスを systemd にサービスとして登録する。scripts/systemd の雛形を参考にして service ファイルを書き systemctl コマンドでサービスを起動する。

$ sudo systemctl enable airflow-scheduler
$ sudo systemctl daemon-reload
$ sudo systemctl start airflow-scheduler.service
$ sudo systemctl status airflow-scheduler.service
● airflow-scheduler.service - Airflow scheduler daemon
   Loaded: loaded (/lib/systemd/system/airflow-scheduler.service; enabled; vendor preset: enabled)
   Active: active (running) since Fri 2021-01-29 23:22:52 JST; 2s ago
 Main PID: 4680 (airflow)
    Tasks: 71 (limit: 4658)
   CGroup: /system.slice/airflow-scheduler.service

Tutorial

続いて, 公式のチュートリアルを実行する。チュートリアルとして用意されている DAG (tutorial.py) を $AIRFLOW_HOME/dags/ 以下に配置する。
python コマンドで実行して例外が発生しなければ概ね Airflow の DAG として問題ないようだ。

$ python $AIRFLOW_HOME/dags/tutorial.py

DAG として認識されていることを確認する。

$ airflow dags list
dag_id   | filepath    | owner   | paused
=========+=============+=========+=======
tutorial | tutorial.py | airflow | True

DAG をツリー形式で出力し, 各タスクの実行順序を確認する。

$ airflow tasks list tutorial --tree
...
<Task(BashOperator): print_date>
    <Task(BashOperator): templated>
    <Task(BashOperator): sleep>

次に, DAG に含まれる各タスク (print_date, sleep, templated) を順次テストする。 airflow tasks test ではスケジューラをシミュレートするため, airflow scheduler プロセスが動いていなくても行える。

$ airflow tasks test tutorial print_date 2015-06-01
...
[2021-01-28 23:16:36,004] {bash.py:158} INFO - Running command: date
[2021-01-28 23:16:36,012] {bash.py:169} INFO - Output:
[2021-01-28 23:16:36,018] {bash.py:173} INFO - 2021年 1月28日 木曜日 23時16分36秒 JST
[2021-01-28 23:16:36,019] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:16:36,040] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=print_date, execution_date=20150601T000000, start_date=20210128T141635, end_date=20210128T141636

$ airflow tasks test tutorial sleep 2015-06-01
...
[2021-01-28 23:16:50,444] {bash.py:158} INFO - Running command: sleep 5
[2021-01-28 23:16:50,451] {bash.py:169} INFO - Output:

[2021-01-28 23:16:55,459] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:16:55,473] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=sleep, execution_date=20150601T000000, start_date=20210128T141650, end_date=20210128T141655

$ airflow tasks test tutorial templated 2015-06-01
...
[2021-01-28 23:19:53,075] {bash.py:158} INFO - Running command:

    echo "2015-06-01"
    echo "2015-06-08"
    echo "Parameter I passed in"

    echo "2015-06-01"
    echo "2015-06-08"
    echo "Parameter I passed in"

    echo "2015-06-01"
    echo "2015-06-08"
    echo "Parameter I passed in"

    echo "2015-06-01"
    echo "2015-06-08"
    echo "Parameter I passed in"

    echo "2015-06-01"
    echo "2015-06-08"
    echo "Parameter I passed in"

[2021-01-28 23:19:53,081] {bash.py:169} INFO - Output:
[2021-01-28 23:19:53,085] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,086] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,086] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,086] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-01
[2021-01-28 23:19:53,087] {bash.py:173} INFO - 2015-06-08
[2021-01-28 23:19:53,087] {bash.py:173} INFO - Parameter I passed in
[2021-01-28 23:19:53,087] {bash.py:177} INFO - Command exited with return code 0
[2021-01-28 23:19:53,102] {taskinstance.py:1142} INFO - Marking task as SUCCESS. dag_id=tutorial, task_id=templated, execution_date=20150601T000000, start_date=20210128T141952, end_date=20210128T141953

おわりに

Production にデプロイする時の指針は Production Deployment が参考になります。Production では環境依存の不具合軽減や再現性の点から Docker を利用する場合もあると思います。 DockerHub の Apache Airflow を利用できますが, このイメージには最小限の機能のみが含まれるため, 実際は用途に応じてイメージを拡張/カスタマイズすることになります。
また, Airflow や付随するバックエンドDBの運用負担を考えると Amazon MWAA (Amazon Managed Workflows for Apache Airflow) や Cloud Composer などの Managed Airflow も選択肢になりそうです。

[1] Initializing a Database Backend
[2] Executor
[3] Best practices for writing Dockerfiles