今回はCloudFunctionからGCS上のCSVを読み込んでBigQueryにデータを追加する方法について紹介したいと思います
cloudfunctionのトリガーを設定
Google Cloud Functionを使用して、特定のGCS(Google Cloud Storage)バケットにファイルが追加された場合にトリガーされる関数を設定することは可能です。以下の手順で設定できます。
GCPコンソールを使う方法:
- Google Cloud Console(https://console.cloud.google.com/)にログインします。
- 「ナビゲーションメニュー」(三本線のアイコン)をクリックし、”Cloud Functions”を選択します。
- 「関数の作成」ボタンをクリックします。
- 関数名、メモリ割り当てなどの基本設定を行います。
- トリガーの設定で、「トリガーのタイプ」を「Cloud Storage」に設定します。
- 「イベントのタイプ」を「オブジェクトの作成」に設定します。
- 「バケット」でトリガーとするGCSバケットを選択します。
- 必要ならば他の設定を行い、関数をデプロイします。
コードで設定する方法(例:gcloud CLIを使用)
gcloud コマンドラインツールを使っても、Cloud Functionを作成し、特定のバケットに対するトリガーを設定できます。
gcloud functions deploy YOUR_FUNCTION_NAME \
--runtime python39 \
--trigger-resource YOUR_BUCKET_NAME \
--trigger-event google.storage.object.finalizeこのコマンドで、指定したバケット(YOUR_BUCKET_NAME)にファイルが追加された(finalizeされた)場合に、Cloud Function(YOUR_FUNCTION_NAME)がトリガーされるように設定されます。
上記の設定が完了すると、指定したGCSバケットに新しいオブジェクトが追加されるたびに、Cloud Functionが自動的に実行されます。このとき、関数の引数にはトリガーに関する情報(dataとcontext)が渡されます。これを使用して、特定の処理(例:BigQueryにデータを追加する等)を行うことができます。
サンプルコード
<requirement.txt>
pandas==1.4.1 google-cloud-storage==2.2.1 google-cloud-bigquery==3.11.4
<main.py>
from io import BytesIO
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery
def read_file(data, context):
# クライアントをインスタンス化
client = storage.Client()
# バケットを取得
bucket_name = data['bucket']
bucket = client.get_bucket(bucket_name)
# BLOB(Binary Large OBject)を構成
file_name = data['name']
#file_name = 'hello_world.csv' #特定のファイル名を指定したい場合
blob = bucket.blob(file_name)
# オブジェクトのデータを取得
content = blob.download_as_bytes()
# バイナリオブジェクトに変換し、データフレームを作成
df = pd.read_csv(BytesIO(content))
print(df.head())
# BigQueryクライアントのインスタンスを作成
bqclient = bigquery.Client()
rows_to_insert = [
{"col1": '111', "col2": '222'},
]
errors = bqclient.insert_rows_json('<データセットid>.<テーブル名>', rows_to_insert) # Make an API request.
if errors == []:
print("New rows have been added.")
else:
print("Encountered errors while inserting rows: {}".format(errors))
また「NoSuchFieldException when trying to retrieve the value of field」というエラーが発生する場合は
挿入する列のデータ型と挿入しようとしているデータ型と違うことが原因なので型を確認してみてください

コメント