今回はCloudFunctionからGCS上のCSVを読み込んでBigQueryにデータを追加する方法について紹介したいと思います
cloudfunctionのトリガーを設定
Google Cloud Functionを使用して、特定のGCS(Google Cloud Storage)バケットにファイルが追加された場合にトリガーされる関数を設定することは可能です。以下の手順で設定できます。
GCPコンソールを使う方法:
- Google Cloud Console(https://console.cloud.google.com/)にログインします。
- 「ナビゲーションメニュー」(三本線のアイコン)をクリックし、”Cloud Functions”を選択します。
- 「関数の作成」ボタンをクリックします。
- 関数名、メモリ割り当てなどの基本設定を行います。
- トリガーの設定で、「トリガーのタイプ」を「Cloud Storage」に設定します。
- 「イベントのタイプ」を「オブジェクトの作成」に設定します。
- 「バケット」でトリガーとするGCSバケットを選択します。
- 必要ならば他の設定を行い、関数をデプロイします。
コードで設定する方法(例:gcloud CLIを使用)
gcloud
コマンドラインツールを使っても、Cloud Functionを作成し、特定のバケットに対するトリガーを設定できます。
gcloud functions deploy YOUR_FUNCTION_NAME \
--runtime python39 \
--trigger-resource YOUR_BUCKET_NAME \
--trigger-event google.storage.object.finalize
このコマンドで、指定したバケット(YOUR_BUCKET_NAME
)にファイルが追加された(finalizeされた)場合に、Cloud Function(YOUR_FUNCTION_NAME
)がトリガーされるように設定されます。
上記の設定が完了すると、指定したGCSバケットに新しいオブジェクトが追加されるたびに、Cloud Functionが自動的に実行されます。このとき、関数の引数にはトリガーに関する情報(data
とcontext
)が渡されます。これを使用して、特定の処理(例:BigQueryにデータを追加する等)を行うことができます。
サンプルコード
<requirement.txt>
pandas==1.4.1 google-cloud-storage==2.2.1 google-cloud-bigquery==3.11.4
<main.py>
from io import BytesIO import pandas as pd from google.cloud import storage from google.cloud import bigquery def read_file(data, context): # クライアントをインスタンス化 client = storage.Client() # バケットを取得 bucket_name = data['bucket'] bucket = client.get_bucket(bucket_name) # BLOB(Binary Large OBject)を構成 file_name = data['name'] #file_name = 'hello_world.csv' #特定のファイル名を指定したい場合 blob = bucket.blob(file_name) # オブジェクトのデータを取得 content = blob.download_as_bytes() # バイナリオブジェクトに変換し、データフレームを作成 df = pd.read_csv(BytesIO(content)) print(df.head()) # BigQueryクライアントのインスタンスを作成 bqclient = bigquery.Client() rows_to_insert = [ {"col1": '111', "col2": '222'}, ] errors = bqclient.insert_rows_json('<データセットid>.<テーブル名>', rows_to_insert) # Make an API request. if errors == []: print("New rows have been added.") else: print("Encountered errors while inserting rows: {}".format(errors))
また「NoSuchFieldException when trying to retrieve the value of field」というエラーが発生する場合は
挿入する列のデータ型と挿入しようとしているデータ型と違うことが原因なので型を確認してみてください
コメント