GCS composeで32以上のオブジェクトをまとめる方法

先日の記事にてパーティション化されたCSVオブジェクトをCloudSQLにimportする方法を紹介しました。 SDKを利用して32より多いオブジェクトをまとめる場合、GCPのコミュニティのチュートリアルのコードをそのまま使っていました。(2023.8.10にアーカイブされ済み) https://github.com/GoogleCloudPlatform/community/blob/master/archived/cloud-storage-infinite-compose/index.md 文章最後に書いてあるように This code is offered for demonstration purposes only, and should not be considered production-ready. Applying it to your production workloads is up to you! コードはデモ用途のため、そのまま本番環境で使うのはで推奨しないです。先日この文を見逃して自分の環境にデプロイし、平常運転1ヶ月後、パーティション化されたCSVオブジェクトの数が増えたらバグが出ました。 事象 起きていた事象は、composeによって作られた中間オブジェクトが消されず残り続け、最終的に数TBのとてつもなく大きいオブジェクトが作成されてしまいました。一つのオブジェクトは5TiBまでというGCSの上限を超えてしまうため、処理が失敗しました。 どこがバグ 問題は関数compose_and_cleanupから呼び出している関数delete_objects_concurrentにあります。オブジェクトをまとめた後、毎回中間オブジェクトを削除していますが、そのdelete処理自体が非同期処理ですべての処理が終了するのを待たずに次のblob.composeの実行が始まります。 The delete_objects_concurrent function is very simple, using fire-and-forget delete tasks in an executor. A more robust implementation might check the futures from the submitted tasks. チュートリアルの中にちゃんと書いてあります。(もっとロバスト性のある実装は、submit済みのタスクのfuturesをチェックするとのこと) まとめようとするオブジェクトの数が少ない場合はほとんど問題がないですが、1000オブジェクトあたりを超えるとdeleteがcompose処理より遅くなるため、next_chunkが永遠に存在しているままwhileループから脱出できない状態になります。 delete処理自体は特に制限ないようですが、数百のオブジェクトを削除する場合時間かかるとドキュメントに記載されています。 https://cloud.google.com/storage/docs/deleting-objects#delete-objects-in-bulk 解決方法 最初に試したことは、記事に書いてあるようにsubmit済みのタスクのfuturesをwait処理でチェックします。つまりすべてのdelete処理が終わるまでに、compose処理を行いません。 (python並行処理の詳細は公式ドキュメントまたは他の記事をご覧ください) from concurrent.futures import ALL_COMPLETED, ThreadPoolExecutor, wait def delete_objects_concurrent(blobs, executor, client) -> None: """Delete Cloud Storage objects concurrently....

August 17, 2023 · Me

パーティション化されたCSVファイルをCloudSQLにimportする方法

問題 パーティション化されたCSVファイルをCloudSQLにimportする場面は時々あると思います。 残念ながらCloudSQLはBigQueryのようにwildcardsによるimportを対応していません。需要はあるようですが↓ https://issuetracker.google.com/issues/132058570?pli=1 ファイルごとにimportするとオーバーヘッドが毎回発生するため、速度的に実用性があまりないと思います。一方、importはオペレーションの1種なので、並列処理はできません。 https://cloud.google.com/sql/docs/troubleshooting#import-export HTTP Error 409: Operation failed because another operation was already in progress. There is already a pending operation for your instance. Only one operation is allowed at a time. Try your request after the current operation is complete. なので、ファイルを結合してimportするのはより現実的な解決策だと思います。 gsutil compose gsutil composeを利用すると、GCSにある複数のファイルを結合できます。 cliのみならず、SDK(google.cloud.storage.Blob.compose)も同じ機能が提供されています。 https://cloud.google.com/storage/docs/composing-objects#create-composite-client-libraries https://cloud.google.com/storage/docs/gsutil/commands/compose ただし、結合できるファイルは最大32個という制約があります。 There is a limit (currently 32) to the number of components that can be composed in a single operation....

August 16, 2023 · Me

Airflowで構築したワークフローを検証する

データ基盤の監視 データ基盤は下流の分析・可視化・モデリングの「基盤」となるので、監視は言うまでもなく品質を担保するため重要な存在です。データ基盤監視の考え方についてこの2つの記事が紹介しています。 https://tech-blog.monotaro.com/entry/2021/08/24/100000 https://buildersbox.corp-sansan.com/entry/2022/08/18/110000 同じくSQLによるデータ基盤を監視しており、最も大きな違いは自作ツールかAirflowで検証することだけです。本文はAirflowで構築したワークフローの検証についてもう少し紹介したいと思います。 まず、Data Pipelines Pocket Referenceではデータ基盤検証の原則が紹介されました。 Validate Early, Validate Often 要はできるだけ早く、できるだけ頻繁に検証するとのことです。ELTあるいはETL処理においては、Extract, Load, Transformそれぞれのステップが終了した直後に監視するのは最も理想的だと思います。 Transformはデータセットのコンテキストを把握しておかないと検証できないため、データセットごとに対応していく必要があります。ExtractとLoadはnon-contextで汎用的な検証ができるため、データ基盤構築の序盤からやっておいた方が安心だと思います。 Extractの検証 ストレージサービス(例えばGCS)をデータレイクにする場合、データソースからデータレイクにデータがちゃんとレプリケートされたかを検証するためにAirflowのairflow.providers.google.cloud.sensors.gcsを利用すると簡単にできます。 https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/sensors/gcs/index.html 一つのファイルをチェックする場合はほとんどないと思うので、GCSObjectExistenceSensorよりGCSObjectsWithPrefixExistenceSensorの方がもっと実用的でしょう。下記のタスクをExtractと次の処理の間に挟むと、障害の早期発見が期待できます。なお、Extract時点で既に障害が起きている場合はほとんどデータソース側の処理が失敗しているので、アプリケーション側と連携して作業する必要があります。 check_extract = GCSObjectsWithPrefixExistenceSensor( task_id="check_extract", bucket="{YOUR_BUCKET}", prefix="{YOUR_PREFIX}", ) Loadの検証 ELTとETL処理、Loadするタイミングは異なりますが、検証の方法(データがデータウェアハウスあるいはデータマートにロードされたか)は同じです。よく使われているデータウェアハウスサービスBigQueryだと、Airflowのairflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperatorを利用できます。 https://airflow.apache.org/docs/apache-airflow/1.10.10/_api/airflow/contrib/operators/bigquery_check_operator/index.html#airflow.contrib.operators.bigquery_check_operator.BigQueryCheckOperator 下記のタスクをLoad処理の後ろに追加すれば良いです。 check_load = BigQueryCheckOperator( task_id="check_load", sql={YOUR_SQL}, use_legacy_sql=False, params={ "bq_suffix": Variable.get("bq_suffix"), "dataset": setting.bq.dataset, "table": setting.bq.table_name, }, location="asia-northeast1", ) SQLは検証用のクエリとなっており、BigQueryのメタデータテーブルがよく用いられます。例えばこの記事が紹介されたクエリでテーブルが空になっているかを確認できます。 SELECT total_rows FROM ${dataset_id}.INFORMATION_SCHEMA.PARTITIONS WHERE table_name = '${table_name}' ORDER BY last_modified_time DESC LIMIT 1; GCPについて簡単に説明しましたが、AWSとAzureも似たようなことはできるはずです。 皆さんのワークフロー設計にご参考になれば幸いです。

November 1, 2022 · Me

自作PythonラッパーでGoogleグループのメンバーシップをより便利に管理する

Googleグループのメンバーシップを管理する方法 Googleグループを管理するのは Workspaceのadmin SDKを利用する https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups CLI gcloudを利用する https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships Cloud Identity APIを叩く https://cloud.google.com/identity/docs/how-to/setup 3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。 方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう) gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能 Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。 シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。 gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h' 一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。 Pythonラッパー 認証 Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。 https://cloud.google.com/identity/docs/how-to/setup アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。 from httplib2 import Http from oauth2client.client import GoogleCredentials from googleapiclient.discovery import build def create_service(): credentials = GoogleCredentials.get_application_default() service_name = "cloudidentity" api_version = "v1" return build( service_name, api_version, http=credentials....

October 2, 2022 · Me