Cloud Tasks をローカルで動かす
はじめに
ソフトウェアエンジニアの村瀬です。本記事では、Cloud Tasks や Cloud Storage といった GCP サービスをローカルでの動作検証の際に利用する方法を説明していきます。
背景
Cloud Tasks とは
Cloud Tasks は Google Cloud Platform(以下、GCP)のサービスの一つです。独立に実行できる処理を「タスク」、タスクを管理・実行するものを「キュー」と呼びます。Cloud Tasks のキューにはタスクの並列実行数や失敗時のリトライ回数、タイムアウトの時間などの様々な設定を行うことができ、それらの設定を行い、タスクをキューに追加すれば実際にタスクの実行を行う部分は全てキューに任せることができます。各タスクの内容は HTTP リクエストで表され、Cloud Functions や Cloud Run など様々なサービスにリクエストを送信して、処理を実行することができます。
Cloud Tasks を用いたアーキテクチャ例
上図は Cloud Tasks を用いた画像処理システムの例です。以下のような流れで利用することができます。
- ブラウザから Backend API へ指定画像に画像処理を行うリクエストが送られる
- Backend API が画像ファイルを Cloud Storage にアップロードする
- Backend API が Cloud Tasks キューに画像処理タスクを追加する
- Cloud Tasks が Worker API へ画像処理リクエストを行う
- Worker API が Cloud Storage から指定されたファイルをダウンロードし、画像処理を行う
- Worker API が Cloud Storage に画像処理の結果をアップロードする
Backend API で画像処理を行うという選択肢もありますが、画像処理に時間がかかる場合は、このようにバックグラウンドで画像処理を行う方が好ましいでしょう。 そのようにすることで、3 番の処理が完了した時点で Backend API からブラウザへレスポンスを返すことができます。
動作検証
システムの開発を行う際に動作検証を行う必要がありますが、上記のアーキテクチャはクラウドサービスを含んでいる点が難点です。クラウドサービスを含むシステムを検証する方法として、いくつかの選択肢が考えられます。
一つ目の方法はクラウドサービスが関わる部分は GCP 環境に実際にデプロイを行って検証を行うというものです。しかし、クラウドサービスに関わる一連の処理を検証したい場合にデプロイが必要になるというのは手間がかかります。
二つ目の方法はローカルの API からクラウドサービスを用いることです。ローカルの API を更新する際に手軽に検証できる点は便利ですが、チーム開発において、例えばストレージサービスを共有してしまうと、同一ファイル名の競合等が懸念されます。また、今回のアーキテクチャではクラウド上の Cloud Tasks からローカルの Worker API にアクセスする必要があるため、そちらの検証にも手間がかかってしまいます。
三つ目の方法はエミュレータを用いることです。Cloud Storage や Cloud Tasks には非公式ながら、エミュレータが存在します。非公式かつエミュレータであるため、実際のクラウドサービスとの差異には注意が必要ですが、ローカル環境のみでシステム全体を検証することができます。
本記事ではこれらのエミュレータを用いて、ローカルでシステムの検証をする方法を説明します。
実装
環境と構成
本記事は前述したアーキテクチャを簡略化した以下のものを実装していきます。
処理の流れは以下のようになっています。
- スクリプトの処理として Cloud Tasks キューにファイルをコピーするタスクを追加する
- Cloud Tasks が Worker API へファイルをコピーするリクエストを行う
- Worker API が fake-gcs-server から指定されたファイルをダウンロードする
- Worker API が fake-gcs-server にダウンロードしたファイルをアップロードする
ディレクトリ構成は以下の通りです。
- app/main.py
- API の処理を記載したもの
- docker/gcs_data, docker/gcs_storage
- ローカルの GCS エミュレータ用ディレクトリ
- docker/compose.yaml
- コンテナ定義
- docker/Dockerfile
- API ビルド用ファイル
- request.py
- Cloud Tasks にリクエストを行うスクリプト
- requirements.txt
- 依存パッケージの定義
依存パッケージの追加
まず、requirements.txt を用いて必要なパッケージをインストールしておきます。 本記事では Python 3.11.2 を用いており、requirements.txt の内容は以下の通りです。
fastapi==0.110.3
google-cloud-storage==2.16.0
google-cloud-tasks==2.16.3
pydantic==2.7.1
uvicorn==0.29.0
GCS コンテナ
GCS のエミュレータ (https://github.com/fsouza/fake-gcs-server) で提供されているイメージを用いて、compose.yaml に定義していきます。
gcs:
image: fsouza/fake-gcs-server
volumes:
- ./gcs_data:/data
- ./gcs_storage:/storage
command: -scheme http -public-host ${URL:-localhost}:4443
ローカルの 2 つのディレクトリを gcs サービスへマウントしてます。
- /data/{bucket_name}/
- こちらにファイルが存在する場合、コンテナ起動時に GCS エミュレータのバケットにファイルが追加される
- /storage/{bucket_name}/
- GCS エミュレータのバケットに存在するファイルおよびメタデータが保持される
- こちらにファイルを直接置いてしまった場合、メタデータが存在しない状態になる点に注意してください
- メタデータが存在しないファイルはファイル一覧取得やダウンロード等の操作対象になった際にエラーが起こります gcs_data/test-bucket に test.txt を作成することで、コンテナ起動時に gcs_storage にバケットおよびファイルがアップロードされます。
Cloud Tasks コンテナ
Cloud Tasks のエミュレータ (https://github.com/aertje/cloud-tasks-emulator) で提供されているイメージを用いて、compose.yaml に定義していきます。 -queue オプションにより、コンテナ起動時に指定した名前の Cloud Tasks キューが作成されます。
cloud-tasks:
image: ghcr.io/aertje/cloud-tasks-emulator
ports:
- "8123:8123"
command: -host 0.0.0.0 -port 8123 -queue "projects/local-project/locations/local-location/queues/local-queue"
API
次に Cloud Tasks からのリクエストを処理する API を実装します。 本記事では Fast API を用いて簡単な API を作成しています。 POST /files/copy にリクエストを行うことで、リクエストボディで指定したファイルが output ディレクトリにコピーされます。
from fastapi import FastAPI, Response, status
from google.auth.credentials import AnonymousCredentials
from google.cloud import storage
from google.cloud.storage.blob import Blob
from pydantic import BaseModel
app = FastAPI()
class CopyRequest(BaseModel):
file_name: str
@app.post("/files/copy")
def copy(copy_request: CopyRequest) -> Response:
# GCS クライアントの作成
gcs_client = storage.Client(
credentials=AnonymousCredentials(),
project="local-project",
client_options={"api_endpoint": "http://gcs:4443"},
)
# 指定ファイルを input/ からダウンロード
file = Blob.from_string(
f"gs://test-bucket/input/{copy_request.file_name}", gcs_client
).download_as_bytes()
# 指定ファイルを output/ へアップロード
Blob.from_string(
f"gs://test-bucket/output/{copy_request.file_name}", gcs_client
).upload_from_string(file)
return Response(status_code=status.HTTP_200_OK)
次に、こちらの API をビルドする Dockerfile を記述します。
FROM python:3.11.2
WORKDIR /code
COPY ./requirements.txt /code/requirements.txt
RUN pip install --no-cache-dir --upgrade -r /code/requirements.txt
COPY ./app /code/app
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8080"]
最後に compose.yaml に API 用コンテナを追加します。compose.yaml は最終的に以下のようになりました。
services:
webserver:
build:
context: ../
dockerfile: ./docker/Dockerfile
gcs:
image: fsouza/fake-gcs-server
volumes:
- ./gcs_data:/data
- ./gcs_storage:/storage
command: -scheme http -public-host ${URL:-localhost}:4443
cloud-tasks:
image: ghcr.io/aertje/cloud-tasks-emulator
ports:
- "8123:8123"
command: -host 0.0.0.0 -port 8123 -queue "projects/local-project/locations/local-location/queues/local-queue"
スクリプト
最後に Cloud Tasks にリクエストを行うスクリプトを作成します。 タスクの内容は、http://webserver:8080/files/copy へ test.txt をコピーする POST リクエストを送るというものです。 ここで指定する URL は Cloud Tasks コンテナ視点の URL であり、ローカルマシン視点でないことに注意してください。
import grpc
from google.cloud import tasks_v2
import uuid
import grpc
from google.cloud import tasks_v2
from google.cloud.tasks_v2.services.cloud_tasks.transports import CloudTasksGrpcTransport
from app.main import CopyRequest
# Cloud Tasks クライアントの作成
channel = grpc.insecure_channel(target="localhost:8123")
transport = CloudTasksGrpcTransport(channel=channel)
cloud_tasks_client = tasks_v2.CloudTasksClient(transport=transport)
parent = cloud_tasks_client.queue_path(
project="local-project",
location="local-location",
queue="local-queue",
)
# タスクの設定
request = CopyRequest(file_name="test.txt")
task = tasks_v2.Task(
name=f"{parent}/tasks/{uuid.uuid4()}",
http_request=tasks_v2.HttpRequest(
url="http://webserver:8080/files/copy",
http_method=tasks_v2.HttpMethod.POST,
headers={"Content-Type": "application/json"},
body=request.model_dump_json().encode(),
),
)
# Cloud Tasks へタスクを追加
cloud_tasks_client.create_task(
parent=parent,
task=task,
)
動作確認
まず docker ディレクトリで docker compose up を行い、コンテナを立ち上げておきます。 次に python request.py コマンドにより、Cloud Tasks にリクエストを送ってみます。 コンテナのログを確認してみると、タスクの実行ができていることが確認できます。
docker-gcs-1 | time=2024-05-07T06:54:25.942Z level=INFO msg="172.18.0.4 - - [07/May/2024:06:54:25 +0000] \"GET /download/storage/v1/b/test-bucket/o/input%2Ftest.txt?alt=media HTTP/1.1\" 200 0\n"
docker-gcs-1 | time=2024-05-07T06:54:25.970Z level=INFO msg="172.18.0.4 - - [07/May/2024:06:54:25 +0000] \"POST /upload/storage/v1/b/test-bucket/o?uploadType=multipart HTTP/1.1\" 200 487\n"
docker-webserver-1 | INFO: 172.18.0.2:35500 - "POST /files/copy HTTP/1.1" 200 OK
docker-cloud-tasks-1 | 2024/05/07 06:54:25 Task done
最後に docker/gcs_storage/bucket の中身を確認しておきましょう。test.txt がコピーできていることが確認できます。
まとめ
本記事では Cloud Storage や Cloud Tasks のエミュレータの導入について紹介しました。
プロジェクトにおいて実際に導入し、開発で利用しましたが、ブラウザから Cloud Tasks 前段の API へ画像を送り、画像処理の結果をブラウザで表示する部分まで、ローカルで検証できる点はとても良かったです。
一方で、注意点もいくつかありました。まず、今回紹介した GCS と Cloud Tasks のエミュレータでは、ローカル接続を行う際に特殊な設定が必要となり、以下のように接続方法を分ける必要がありました。
def get_gcs_client(project_name: str) -> storage.Client:
if project_name == "local-project":
# ローカルの fake-gcs-server コンテナへ接続
return storage.Client(
project="local-project", client_options={"api_endpoint": app_settings.gcs_endpoint}
)
else:
return storage.Client(project=project_name)
関連して、クラウド上での認証に関わる部分はローカルでは検証できません。
また、エミュレータであることもあり、実際のサービスと差異がある部分もあるようです。
例えば、Cloud Storage には署名付き URL を発行する機能がありますが、fake-gcs-server で同様に発行しようとした場合、署名の際にローカルであっても GCP のアカウントを要求され、そちらを解決してもホストが storage.googleapis.com となった URL が発行されました。
クエリパラメータのバリデーションの有無も異なるようです (※参考)。開発ではローカルで署名付き URL を発行した際に、URL のホストを localhost:4443 に置き換えるという処理を入れることで対応しました。
参考
- Cloud Tasks
- Cloud tasks emulator
- fake-gcs-server
- FastAPI
エムシーデジタルでは、技術力向上のためのイベントや勉強会なども定期的に実施しています。 もしエムシーデジタルで働くことに興味を持っていただいた方がいらっしゃいましたら、カジュアル面談も受け付けておりますので、お気軽にお声掛けください!
採用情報や面談申込みはこちらから