先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。
AirflowからDataformをトリッガーする ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations
簡単にまとめると
DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする DataformCreateWorkflowInvocationOperator: sqlを実行する しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。
Dataformに変数を渡す まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。
{ "defaultSchema": "dataform", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "project-stg", "defaultLocation": "asia-northeast1", "vars": { "bq_suffix": "_stg", "execution_date": "2023-05-24" } } DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。
https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46
compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig
CodeCompilationConfig内にvarsという変数を指定できるようです。
{ "defaultDatabase": string, "defaultSchema": string, "defaultLocation": string, "assertionSchema": string, "vars": { string: string, ... }, "databaseSuffix": string, "schemaSuffix": string, "tablePrefix": string } BigQueryのsuffixをcode_compilation_configのvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。
DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "bq_suffix": "_stg", } }, }, ) Dataformにcontext変数を渡す 増分処理する際によくdata_interval_endなどのcontext変数を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。...