Cloud Composerでmax_active_tasks_per_dagのデフォルト値が機能していない問題

問題 先日Cloud Composerの環境を↓にバージョンアップしました。 Cloud Composer 2.0.32 Airflow 2.2.5 core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。 [core] dags_folder = /home/airflow/gcs/dags plugins_folder = /home/airflow/gcs/plugins executor = CeleryExecutor dags_are_paused_at_creation = True load_examples = False donot_pickle = True dagbag_import_timeout = 300 default_task_retries = 2 killed_task_cleanup_time = 3570 parallelism = 0 non_pooled_task_slot_count = 100000 dag_concurrency = 100 .... core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated 試み 手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。 仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。 解決 デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。 ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。...

February 8, 2023 · Me