Googleグループのメンバーシップを管理する方法
Googleグループを管理するのは
- Workspaceのadmin SDKを利用する
https://developers.google.com/admin-sdk/directory/v1/guides/manage-groups
- CLI
gcloud
を利用する
https://cloud.google.com/sdk/gcloud/reference/identity/groups/memberships
- Cloud Identity APIを叩く
https://cloud.google.com/identity/docs/how-to/setup
3つの方法があります
CLI gcloud
はCloud Identity APIを叩いているので、方法2と方法3は実質同じです。
方法1との違いはなんでしょう? ドキュメント読んで実際に試したところ、大きな違いは
- WorkspaceはGCPで作ったサービスアカウントをメンバーとして追加できない(それはそう)
gcloud
あるいはCloud Identity APIだと有効期限付きでメンバー追加することが可能
Googleグループによってセンシティブな情報のアクセス制御する際に、有効期限付きでメンバー追加する機能を利用すると、セキュリティを担保できるので、使い勝手がかなり良いと思います。
シェルスクリプトを書いてGoogleグループ管理を自動化する場合であれば、ローカル環境やリモート環境(EC2やCloud Runなどを含めて)問わず、gcloud
は断然便利です。下記のコマンドで一発aliceさんを1時間有効期限付きでグループgroup_name@email.com
に追加できます。
gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='1h'
一方、アプリケーションなどに組み込みたい場合は、Cloud Identity APIを叩くことになります。 現時点(2022.10.1)Cloud IdentityはBigqueyなどのサービスのような公式SDKが存在しなく、APIのドキュメントも若干わかりづらいので、この度より便利にアプリケーションからGoogleグループを管理するために、Pythonラッパーを作ってみました。簡単に修正すれば他言語でも利用できる(はず)です。
Pythonラッパー
認証
Cloud Identity APIを叩くのに、言うまでもなく認証が必要です。 キーによる認証は公式のドキュメントにあるため、コピペすればOKです。
https://cloud.google.com/identity/docs/how-to/setup
アプリケーションをCloud Runなどにデプロイしている場合、キーを使わずにサービスアカウントとして認証する方法もあります。
from httplib2 import Http
from oauth2client.client import GoogleCredentials
from googleapiclient.discovery import build
def create_service():
credentials = GoogleCredentials.get_application_default()
service_name = "cloudidentity"
api_version = "v1"
return build(
service_name,
api_version,
http=credentials.authorize(Http()),
cache_discovery=False,
)
Group IDとMembership ID
実際にメンバーシップを取得するAPIを叩いてみたらわかると思います。memberships_id
とgroup_id
によってメンバーシップのname
が決められます。後ほどグループからメンバーを削除する際に、ここのname
が必須です。
{
"memberships": [
{
"name": "groups/{YOUR_GROUP_ID}/memberships/{YOUR_MEMBERSHIP_ID}",
"preferredMemberKey": {"id": "name1@email.com"},
"roles": [{"name": "MEMBER"}],
},
{
"name": "groups/{YOUR_GROUP_ID}/memberships/{YOUR_MEMBERSHIP_ID}",
"preferredMemberKey": {"id": "name1@email.com"},
"roles": [{"name": "OWNER"}, {"name": "MEMBER"}],
},
]
}
gourp_id
はグループのemailによって取得できます。
param = f"&groupKey.id={group_email}"
lookup_group_name_request = service.groups().lookup()
lookup_group_name_request.uri += param
lookup_group_name_response = lookup_group_name_request.execute()
return lookup_group_name_response.get("name")
ラッパーを作る
最終的に作ったラッパーは以下となります
get_group_id
グループのemailによってgroup_id
を取得するget_member_id
メンバーシップ一覧からメンバーのemailによってmembership_id
を取得するcheck_membership
ユーザがすでに特定のグループに入っているかを確認add_member
グループにユーザを追加- 有効期限を設定することも可能
delete_member
グループからユーザを削除group_id
で当該グループのメンバーシップを検索get_member_id
でmembership_id
を取得する- メンバーを削除
from datetime import datetime, timedelta
from urllib.parse import urlencode
from googleapiclient.errors import HttpError
class GoogleGroup:
"""An wrapping of Cloud Identity API"""
def __init__(self, group_email: str) -> None:
"""Initialze
Args:
group_email (str): email address of a google group
"""
self.service = create_service()
self.group_email = group_email
self.group_id = self.get_group_id()
def get_group_id(self) -> str:
"""obtain group id by group email address
Returns:
str: group id (group/{GROUP_ID})
"""
try:
param = f"&groupKey.id={self.group_email}"
lookup_group_name_request = self.service.groups().lookup()
lookup_group_name_request.uri += param
# Given a group ID and namespace, retrieve the ID for parent group
lookup_group_name_response = lookup_group_name_request.execute()
return lookup_group_name_response.get("name")
except HttpError as e:
print(
f"Cannot obtain name from group {self.group_email}: {e}",
)
raise
@staticmethod
def get_member_id(memberships: list, member_email: str) -> str:
"""Obtain member id from memberships list
Args:
memberships (list): memberships list
member_email (str): email address of a member
Returns:
str: member id (groups/{GROUP_ID}/memberships/{MEMBERSHIP_ID})
"""
for member in memberships:
if member["preferredMemberKey"]["id"] == member_email:
return member["name"]
return None
def check_membership(self, member_email: str) -> str:
"""check member is whether or not in this group
Args:
member_email (str): email address of a member
Returns:
str: if the member is in this group, return {"hasMembership": True}, otherwise None
"""
query_params = urlencode({"query": f"member_key_id == '{member_email}'"})
request = (
self.service.groups()
.memberships()
.checkTransitiveMembership(parent=self.group_id)
)
request.uri += "&" + query_params
response = request.execute()
return response
def add_member(self, member_email: str, expiration: int = None) -> None:
"""Add an applicant to google group
Args:
member_email (str): email address of a member
expiration (int, optional): has expire datetime or not. Defaults to True.
"""
expire_datetime = None
if expiration:
expire_datetime = (datetime.now() + timedelta(hours=expiration)).strftime(
"%Y-%m-%dT%H:%M:%SZ"
)
try:
# Create a membership object with a member_key and a single role of type MEMBER
membership = {
"preferredMemberKey": {"id": member_email},
"roles": {
"name": "MEMBER",
"expiryDetail": {"expireTime": expire_datetime},
},
}
# Create a membership using the ID for the parent group and a membership object
response = (
self.service.groups()
.memberships()
.create(parent=self.group_id, body=membership)
.execute()
)
print(response)
except HttpError as e:
print(f"Cannot add {member_email} to {self.group_email}: {e}")
raise
def delete_member(self, member_email: str) -> str:
"""Delete a member from google group via cloud identify API
Args:
member_email (str): email address of a member
"""
try:
memberships = (
self.service.groups().memberships().list(parent=self.group_id).execute()
)["memberships"]
member_id = self.get_member_id(
memberships=memberships, member_email=member_email
)
response = (
self.service.groups().memberships().delete(name=member_id).execute()
)
print(message=response)
except HttpError as e:
print(f"Cannot delete {member_email} from {self.group_email}: {e}")
raise
使い方
alice@email.com
を有効期限12h付きでグループgroup_name@email.com
に追加
email = "alice@email.com"
gg = GoogleGroup(group_email="group_name@email.com")
if not gg.check_membership(email):
gg.add_member(member_email=email, expiration=12)
効果的には以下のコマンドと同じです。
gcloud identity groups memberships add --group-email="group_name@email.com" --member-email="alice@email.com" --expiration='12h'
alice@email.com
をgroup_name@email.com
から削除
email = "alice@email.com"
gg = GoogleGroup(group_email="group_name@email.com")
if gg.check_membership(email):
gg.delete_member(member_email=email)
効果的には以下のコマンドと同じです。
gcloud identity groups memberships delete --group-email="group_name@email.com" --member-email="alice@email.com"
最後に
アプリケーションからGoogleグループをより便利に管理するため、Pythonラッパーを作りました。公式のCloud Identity SDKが出る前に、代替案としてまあまあ使えると思います。実際に筆者はこのラッパー(細かいところやや異なるが)を用いて、データ基盤の承認システムを実装しました。 そんなに難しいことをやっていないので、ソースをGitHubに上げるのをやめました。 遠慮なくご自由に利用してください。ご参考になれば嬉しいです。