Googleグループのメンバーシップを管理する方法

Googleグループを管理するのは

  1. Workspaceのadmin SDKを利用する

https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups

  1. CLI gcloudを利用する

https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships

  1. Cloud Identity APIを叩く

https://cloud.google.com/identity/docs/how-to/setup

3つの方法があります CLI gcloudはCloud Identity APIを叩いているので、方法2と方法3は実質同じです。

方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは

  1. WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう)
  2. gcloudあるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能

Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。

シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloudは断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.comに追加できます。

gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h'

一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。

Pythonラッパー

認証 

Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。

https://cloud.google.com/identity/docs/how-to/setup

アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。

from httplib2 import Http

from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build


def create_service():
    credentials = GoogleCredentials.get_application_default()

    service_name = "cloudidentity"
    api_version = "v1"
    return build(
        service_name,
        api_version,
        http=credentials.authorize(Http()),
        cache_discovery=False,
    )

Group IDとMembership ID

実際にメンバーシップを取得するAPIを叩いてみたらわかると思います。memberships_idgroup_idによってメンバーシップのnameが決められます。後ほどグループからメンバーを削除する際に、ここのnameが必須です。

https://cloud.google.com/identity/docs/how-to/query-memberships#searching_for_all_memberships_in_a_group

{
    "memberships": [
        {
            "name": "groups/{YOUR_GROUP_ID}/memberships/{YOUR_MEMBERSHIP_ID}",
            "preferredMemberKey": {"id": "name1@email.com"},
            "roles": [{"name": "MEMBER"}],
        },
        {
            "name": "groups/{YOUR_GROUP_ID}/memberships/{YOUR_MEMBERSHIP_ID}",
            "preferredMemberKey": {"id": "name1@email.com"},
            "roles": [{"name": "OWNER"}, {"name": "MEMBER"}],
        },
    ]
}

gourp_idはグループのemailによって取得できます。

param = f"&groupKey.id={group_email}"
lookup_group_name_request = service.groups().lookup()
lookup_group_name_request.uri += param
lookup_group_name_response = lookup_group_name_request.execute()
return lookup_group_name_response.get("name")

ラッパーを作る

最終的に作ったラッパーは以下となります

  • get_group_id グループのemailによってgroup_idを取得する
  • get_member_id メンバーシップ一覧からメンバーのemailによってmembership_idを取得する
  • check_membership ユーザがすでに特定のグループに入っているかを確認
  • add_member グループにユーザを追加
    • 有効期限を設定することも可能
  • delete_member グループからユーザを削除
    • group_idで当該グループのメンバーシップを検索
    • get_member_idmembership_idを取得する
    • メンバーを削除
from datetime import datetime, timedelta
from urllib.parse import urlencode

from googleapiclient.errors import HttpError


class GoogleGroup:
    """An wrapping of Cloud Identity API"""

    def __init__(self, group_email: str) -> None:
        """Initialze
        Args:
            group_email (str): email address of a google group
        """

        self.service = create_service()
        self.group_email = group_email
        self.group_id = self.get_group_id()

    def get_group_id(self) -> str:
        """obtain group id by group email address
        Returns:
            str: group id (group/{GROUP_ID})
        """
        try:
            param = f"&groupKey.id={self.group_email}"
            lookup_group_name_request = self.service.groups().lookup()
            lookup_group_name_request.uri += param
            # Given a group ID and namespace, retrieve the ID for parent group
            lookup_group_name_response = lookup_group_name_request.execute()
            return lookup_group_name_response.get("name")
        except HttpError as e:
            print(
                f"Cannot obtain name from group {self.group_email}: {e}",
            )
            raise

    @staticmethod
    def get_member_id(memberships: list, member_email: str) -> str:
        """Obtain member id from memberships list
        Args:
            memberships (list): memberships list
            member_email (str): email address of a member
        Returns:
            str: member id (groups/{GROUP_ID}/memberships/{MEMBERSHIP_ID})
        """
        for member in memberships:
            if member["preferredMemberKey"]["id"] == member_email:
                return member["name"]
        return None

    def check_membership(self, member_email: str) -> str:
        """check member is whether or not in this group
        Args:
            member_email (str): email address of a member
        Returns:
            str: if the member is in this group, return {"hasMembership": True}, otherwise None
        """
        query_params = urlencode({"query": f"member_key_id == '{member_email}'"})
        request = (
            self.service.groups()
            .memberships()
            .checkTransitiveMembership(parent=self.group_id)
        )
        request.uri += "&" + query_params
        response = request.execute()

        return response

    def add_member(self, member_email: str, expiration: int = None) -> None:
        """Add an applicant to google group
        Args:
            member_email (str): email address of a member
            expiration (int, optional): has expire datetime or not. Defaults to True.
        """
        expire_datetime = None
        if expiration:
            expire_datetime = (datetime.now() + timedelta(hours=expiration)).strftime(
                "%Y-%m-%dT%H:%M:%SZ"
            )

        try:
            # Create a membership object with a member_key and a single role of type MEMBER
            membership = {
                "preferredMemberKey": {"id": member_email},
                "roles": {
                    "name": "MEMBER",
                    "expiryDetail": {"expireTime": expire_datetime},
                },
            }

            # Create a membership using the ID for the parent group and a membership object
            response = (
                self.service.groups()
                .memberships()
                .create(parent=self.group_id, body=membership)
                .execute()
            )
            print(response)
        except HttpError as e:
            print(f"Cannot add {member_email} to {self.group_email}: {e}")
            raise

    def delete_member(self, member_email: str) -> str:
        """Delete a member from google group via cloud identify API
        Args:
            member_email (str): email address of a member
        """
        try:
            memberships = (
                self.service.groups().memberships().list(parent=self.group_id).execute()
            )["memberships"]
            member_id = self.get_member_id(
                memberships=memberships, member_email=member_email
            )

            response = (
                self.service.groups().memberships().delete(name=member_id).execute()
            )
            print(message=response)

        except HttpError as e:
            print(f"Cannot delete {member_email} from {self.group_email}: {e}")
            raise

使い方

  • alice@email.comを有効期限12h付きでグループgroup_name@email.comに追加
email = "alice@email.com"
gg = GoogleGroup(group_email="group_name@email.com")
if not gg.check_membership(email):
    gg.add_member(member_email=email, expiration=12)

効果的には以下のコマンドと同じです。

gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='12h'
  • alice@email.comgroup_name@email.comから削除
email = "alice@email.com"
gg = GoogleGroup(group_email="group_name@email.com")
if gg.check_membership(email):
    gg.delete_member(member_email=email)

効果的には以下のコマンドと同じです。

gcloud identity groups memberships delete --group-email="group_name@email.com" --member-email="alice@email.com"

最後に

アプリケーションからGoogleグループをより便利に管理するため、Pythonラッパーを作りました。公式のCloud Identity SDKが出る前に、代替案としてまあまあ使えると思います。実際に筆者はこのラッパー(細かいところやや異なるが)を用いて、データ基盤の承認システムを実装しました。 そんなに難しいことをやっていないので、ソースをGitHubに上げるのをやめました。 遠慮なくご自由に利用してください。ご参考になれば嬉しいです。