問題

先日Cloud Composerの環境を↓にバージョンアップしました。

Cloud Composer 2.0.32
Airflow 2.2.5

core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag

ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。

[core]
dags_folder = /home/airflow/gcs/dags
plugins_folder = /home/airflow/gcs/plugins
executor = CeleryExecutor
dags_are_paused_at_creation = True
load_examples = False
donot_pickle = True
dagbag_import_timeout = 300
default_task_retries = 2
killed_task_cleanup_time = 3570
parallelism = 0
non_pooled_task_slot_count = 100000
dag_concurrency = 100
....

core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。

https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated

試み

手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。

仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。

解決

デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。

ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。

https://cloud.google.com/composer/docs/release-notes