パーティション化されたCSVファイルをCloudSQLにimportする方法

問題 パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓ https://issuetracker.google.com/issues/132058570?pli=1 ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。 https://cloud.google.com/sql/docs/troubleshooting#import-export HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete. なので、ファイルを結合してimportするのはより現実的な解決策だと思います。 gsutil compose gsutil composeを利用すると、GCSにある複数のファイルを結合できます。 cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。 https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries https://cloud.google.com/storage/docs/gsutil/commands/compose ただし、結合できるファイルは最大32個という制約があります。 There is a limit (currently 32) to the number of components that can be composed in a single operation....

August 16, 2023 · Me

AirflowからDataformにdata_interval_endなどのcontext変数を渡す方法

先日GCPのDataformがGAリリースされました。 せっかくなので、まずAirflowにある既存ワークフローの一部をDataformで書き換えようと思いました。 AirflowからDataformをトリッガーする ドキュメントを調べると、AirflowからDataformをトリッガーするoperatorはすでに存在しています。 https://cloud.google.com/dataform/docs/schedule-executions-composer#create_an_airflow_dag_that_schedules_workflow_invocations 簡単にまとめると DataformCreateCompilationResultOperator: sqlxをsqlにコンパイルする DataformCreateWorkflowInvocationOperator: sqlを実行する しかし、どのようにAirflowからDataformへ変数を渡すかについてはドキュメントに記載されていません。 Dataformに変数を渡す まず、Dataformの設定ファイルdataform.jsonに変数varsを追加しておきましょう。 { "defaultSchema": "dataform", "assertionSchema": "dataform_assertions", "warehouse": "bigquery", "defaultDatabase": "project-stg", "defaultLocation": "asia-northeast1", "vars": { "bq_suffix": "_stg", "execution_date": "2023-05-24" } } DataformCreateCompilationResultOperatorのソースを調べてみたところ、compilation_resultという引数があることを発見しました。 https://github.com/apache/airflow/blob/739e6b5d775412f987a3ff5fb71c51fbb7051a89/airflow/providers/google/cloud/operators/dataform.py#LL73C29-L73C46 compilation_resultの中身を確認するため、APIの詳細を調べました。 https://cloud.google.com/dataform/reference/rest/v1beta1/CodeCompilationConfig CodeCompilationConfig内にvarsという変数を指定できるようです。 { "defaultDatabase": string, "defaultSchema": string, "defaultLocation": string, "assertionSchema": string, "vars": { string: string, ... }, "databaseSuffix": string, "schemaSuffix": string, "tablePrefix": string } BigQueryのsuffixをcode_compilation_configのvarsへ渡してみたら問題なく実行できました。ちなみに、Dataform側からはdataform.projectConfig.vars.bq_suffixで変数を呼び出せます。 DataformCreateCompilationResultOperator( task_id="create_compilation_result", project_id=PROJECT_ID, region=REGION, repository_id=REPOSITORY_ID, compilation_result={ "git_commitish": GIT_COMMITISH, "code_compilation_config": { "vars": { "bq_suffix": "_stg", } }, }, ) Dataformにcontext変数を渡す 増分処理する際によくdata_interval_endなどのcontext変数を利用して当日の差分だけ取り入れます。 しかし、DataformCreateCompilationResultOperatorではtemplate_fieldsが実装されていないため、直接{{ data_interval_end }}のようなjinjaテンプレートを渡すことはできません。...

May 24, 2023 · Me

Apache Airflowのコミッターになった話

Google Providersのバグを見つけた 先日DAGを開発中にGoogle Providers (apache-airflow-providers-google==8.9.0)のCloudDataTransferServiceJobStatusSensorを使用したところ、 project_idはオプション引数であるにも関わらず、省略するとエラーが発生するというバグに遭遇しました。 [2023-03-09, 02:31:24 UTC] {taskinstance.py:1774} ERROR - Task failed with exception Traceback (most recent call last): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/sensors/base.py", line 236, in execute while not self.poke(context): File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/sensors/cloud_storage_transfer_service.py", line 91, in poke operations = hook.list_transfer_operations( File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 380, in list_transfer_operations request_filter = self._inject_project_id(request_filter, FILTER, FILTER_PROJECT_ID) File "/home/airflow/.local/lib/python3.8/site-packages/airflow/providers/google/cloud/hooks/cloud_storage_transfer_service.py", line 459, in _inject_project_id raise AirflowException( airflow.exceptions.AirflowException: The project id must be passed either as `project_id` key in `filter` parameter or as project_id extra in Google Cloud connection definition....

May 11, 2023 · Me

Airflowの単体テストを書きましょう

データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、品質の担保は言うまでもなく重要ですね。品質を確保するには、ワークフローの監視・検証、ワークフローのテスト、そして加工用クエリのテストがいずれも欠かせません。この記事では、ワークフロー(Airflow)の単体テスト方法について紹介します。また、ワークフローの監視・検証に関しては、過去の記事も合わせてご覧いただけると幸いです。 ワークフローの監視 ワークフローの検証 DAGの単体テスト まずは、DAGの単体テストについて説明します。厳密に言えば、DAGの実行ではなく、DAGが正確に構築されたかどうかのテストを行います。 https://airflow.apache.org/docs/apache-airflow/stable/best-practices.html#unit-tests Airflowの公式ベストプラクティスでは簡潔に紹介されていますが、具体例を挙げてさらに詳しく説明しましょう。 importのテスト importが正常にできることを確認する(importが失敗するとWeb UIからも確認できるが、単体テストする時点で確認するともっと便利) import時間を制限する。(冗長なDAGがあると解析するのに時間がかかるので、import時間を制限することで、事前に冗長なDAGを発見できる) 最低でも1つのタスクが含まれていることを確認する。 import unittest from datetime import timedelta from airflow.models import DagBag class TestImportDags(unittest.TestCase): IMPORT_TIMEOUT = 120 @classmethod def setUpClass(cls) -> None: cls.dagbag = DagBag() cls.stats = cls.dagbag.dagbag_stats def test_import_dags(self): self.assertFalse( len(self.dagbag.import_errors), f"DAG import failures. Errors: {self.dagbag.import_errors}", ) def test_import_dags_time(self): duration = sum((o.duration for o in self.stats), timedelta()).total_seconds() self.assertLess(duration, self.IMPORT_TIMEOUT) def test_dags_have_at_least_one_task(self): for key, dag in self.dagbag.dags.items(): self.assertTrue(dag, f"DAG {key} not exsit") self....

April 6, 2023 · Me

AirflowからAirbyteをトリッガーする際にハマるポイント

https://docs.airbyte.com/operator-guides/using-the-airflow-airbyte-operator/ AirflowからAirbyte Operatorを利用するための設定について、Airbyte公式の記事は既にわかりやすくまとめています。実際に試してみて、少しハマったところがあったので、その知見を共有したいと思います。 1. Airflowを2.3.0以上にアップグレードする必要がある apache-airflow-providers-airbyte[http]を利用するのにAirflowを2.3.0以上に上げないといけません。(apache-airflow-providers-airbyte[http]をdocker-composer.ymlの_PIP_ADDITIONAL_REQUIREMENTSに追加することも忘れずに) Cloud Composerなどを利用している場合、GUIからアップグレード可能です。 https://airflow.apache.org/docs/apache-airflow-providers-airbyte/stable/index.html version: '3' x-airflow-common: &airflow-common image: apache/airflow:2.3.4-python3.8 environment: &airflow-common-env PYTHONPATH: /opt/airflow/dags AIRFLOW__CORE__EXECUTOR: CeleryExecutor AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:password@postgres/airflow AIRFLOW__CELERY__RESULT_BACKEND: db+postgresql://airflow:password@postgres/airflow AIRFLOW__CELERY__BROKER_URL: redis://:@redis:6379/0 AIRFLOW__API__AUTH_BACKEND: 'airflow.api.auth.backend.basic_auth' # 追加 _PIP_ADDITIONAL_REQUIREMENTS: apache-airflow-providers-airbyte[http]==3.2.0 2. Airflowの古いバージョンから2.3.4上げるとdocker-composeがバグる airflow 2.2.xでは問題なく環境構築できていましたが、イメージをapache/airflow:2.3.4-python3.8に変更してdocker compose up airflow-initを実行したら怒られます。 You are running pip as root. Please use 'airflow' user to run pip! Airflowの古いdocker-composer.ymlのバグのようなので、 https://github.com/apache/airflow/pull/23517/files services -> airflow-init -> environmentに_PIP_ADDITIONAL_REQUIREMENTS: ''を追加すれば解決できます。 ... environment: <<: *airflow-common-env _AIRFLOW_DB_UPGRADE: 'true' _AIRFLOW_WWW_USER_CREATE: 'true' _AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow} _AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-password} # 追加 _PIP_ADDITIONAL_REQUIREMENTS: '' ....

March 6, 2023 · Me

Cloud Composerでmax_active_tasks_per_dagのデフォルト値が機能していない問題

問題 先日Cloud Composerの環境を↓にバージョンアップしました。 Cloud Composer 2.0.32 Airflow 2.2.5 core.max_active_tasks_per_dagという一つのDAG内同時に処理できるタスクの上限を設定するパラメータがデフォルト値16のままになっているのにも関わらず、実行するタスクの上限が明らかに16を超えています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#max-active-tasks-per-dag ローカルにあるAirflow 2.2.5環境では何の異常もなく、ComposerのAirflow Configurationを確認したところ、なぜかcore.dag_concurrencyが100に設定されています。 [core] dags_folder = /home/airflow/gcs/dags plugins_folder = /home/airflow/gcs/plugins executor = CeleryExecutor dags_are_paused_at_creation = True load_examples = False donot_pickle = True dagbag_import_timeout = 300 default_task_retries = 2 killed_task_cleanup_time = 3570 parallelism = 0 non_pooled_task_slot_count = 100000 dag_concurrency = 100 .... core.dag_concurrencyの役割はcore.max_active_tasks_per_dagと同じく、一つのDAG内同時に処理できるタスクの上限を設定しています。Airflow 2.2.0からはすでにDeprecatedになったはずなのに、なぜか残り続いています。 https://airflow.apache.org/docs/apache-airflow/stable/configurations-ref.html#dag-concurrency-deprecated 試み 手動で削除しようと思ったですけど、バージョンを上げたのでCloud Composer -> AIRFLOW CONFIGURATION OVERRIDESにcore.dag_concurrencyというパラメータすら存在しませんでした。 仕方なく、GCSから設定ファイルgs://asia-northeast1-colossus-wo-xxxxxxx-bucket/airflow.cfgを直接編集してみました。しかし、gcloud composer environments storage dags importを実行すると初期化が処理が実行され、core.dag_concurrencyが再び出てきました。 解決 デフォルト値ではなく、手動でcore.max_active_tasks_per_dagを明示的に16に指定すると、実行するタスクの上限が期待通りに動作しました。 ザクッとComposerのリリースノートを確認してこのバグまだ修正されていないようです。...

February 8, 2023 · Me

Cloud SQLにある大量なテーブルをBigQueryに入れる話

経緯 こんにちは、データエンジニアのjcです。 昨年度から大規模なデータ分析基盤の構築に携わっています。最近Cloud SQLにある6つのDBの数百個のテーブルを日次洗い替えでBigQueryにあるデータ基盤に入れるタスクを取り組んでいます。 Cloud SQLとBigQuery両方ともGCPのサービスのため、federated queriesを利用すると簡単にできそうに見えますが、 https://cloud.google.com/bigquery/docs/federated-queries-intro 実際に行ってみると、以下の3つの課題を気づきました。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない プロダクトの進化とともにテーブル・カラムが頻繁に作成・変更されるため、BigQuery側でも対応しないといけない Cloud SQLにあるテーブルの定義をそのまま取ってきてもBigQueryではMySQLとPostgreSQLの一部の型が対応されていない https://cloud.google.com/bigquery/docs/tables#sql_1 少し苦労していましたが、幸い解決方法を見つけました。 今後躓く方もいるかもしれないので、知見を共有したいと思います。 BigQuery側でスキーマ情報を含めたテーブルを一々作成するのは現実的ではない BigQueryはクエリの結果によってテーブルを作成できるので、事前にテーブルを作っておく必要がありません。 https://cloud.google.com/bigquery/docs/tables#sql_1 大量なテーブルを一々作成するのは現実的ではない課題の解決法としてはDBのメタ情報(descriptionを含めて)をそのまま生かしてテーブル作成用クエリを生成し、テーブルを作成します。 例えばPostgreSQLの場合、まずはテーブルのメタ情報 SELECT schemaname, relname AS table_name, obj_description(relid) AS description FROM pg_catalog.pg_statio_all_tables WHERE schemaname = '{YOUR_SCHEMA}' とカラムのメタ情報を取得します。 SELECT c.table_schema, c.table_name, c.column_name, c.data_type, pgd.description FROM pg_catalog.pg_statio_all_tables AS st INNER JOIN pg_catalog.pg_description pgd on ( pgd.objoid = st.relid ) RIGHT JOIN information_schema.columns c ON ( pgd.objsubid = c.ordinal_position and c.table_schema = st.schemaname and c....

February 2, 2023 · Me

M1 MacでDocker DesktopからRancher Desktopに移行

https://www.docker.com/pricing/october-2022-pricing-change-faq/ The list price of the Docker Business subscription will go up by $3, to $24 per user per month 2022年10月のお知らせですが、Docker Desktop Business subscriptionがなんと8倍値上げ!! コスト面の理由でRancher Desktopに移行することになりました 移行する際に、Rancher Desktopの2つバグを見つけました。これから躓く人もいると思うので、一旦バグ内容と解決法を共有します。 バグ1:volumesをマウントする際にchownからpermission deniedエラーが出る https://github.com/rancher-sandbox/rancher-desktop/issues/1209 issue自体はまだ解決されていない(2023年1月)ですが、 ~/Library/Application\ Support/rancher-desktop/lima/_config/override.yamlに下記の設定を追加すれば回避できます。 mountType: 9p mounts: - location: "~" 9p: securityModel: mapped-xattr cache: "mmap" バグ2:M1 MacはMonterey 12.4以上に上げないと、割り当てられるメモリは最大3GBになる Rancher DesktopのGUIからメモリを32GBに設定したにもかかわらず、 docker infoで確認すると、CPUは設定通りですが、メモリは2.9GiBしか割り当てられていませんでした。 Architecture: aarch64 CPUs: 6 Total Memory: 2.909GiB Name: lima-rancher-desktop https://github.com/rancher-sandbox/rancher-desktop/issues/2855 Rancher Desktopがlimaという仮想マシンを利用しているので、どうやらMonterey 12.4に上げないといけません。 解決 arm64の対応がまだ難しそうなので、他の方法を考えました。 minikubeを使うとDocker DesktopあるいはRancher Desktopを経由せず、Dockerエンジンをインストールする方法もあります。 しかしM1 Mac(arm64)はhyperkitのインストールがうまくいきませんでした。 https://dhwaneetbhatt....

January 19, 2023 · Me