GCP

CloudFunctionからGCS上のCSVを読み込んでBigQueryにデータを追加する

この記事は約5分で読めます。

 

今回はCloudFunctionからGCS上のCSVを読み込んでBigQueryにデータを追加する方法について紹介したいと思います

 

cloudfunctionのトリガーを設定

 

Google Cloud Functionを使用して、特定のGCS(Google Cloud Storage)バケットにファイルが追加された場合にトリガーされる関数を設定することは可能です。以下の手順で設定できます。

 

GCPコンソールを使う方法:

 

  1. Google Cloud Console(https://console.cloud.google.com/)にログインします。
  2. 「ナビゲーションメニュー」(三本線のアイコン)をクリックし、”Cloud Functions”を選択します。
  3. 「関数の作成」ボタンをクリックします。
  4. 関数名、メモリ割り当てなどの基本設定を行います。
  5. トリガーの設定で、「トリガーのタイプ」を「Cloud Storage」に設定します。
  6. 「イベントのタイプ」を「オブジェクトの作成」に設定します。
  7. 「バケット」でトリガーとするGCSバケットを選択します。
  8. 必要ならば他の設定を行い、関数をデプロイします。

 

コードで設定する方法(例:gcloud CLIを使用)

 

gcloud コマンドラインツールを使っても、Cloud Functionを作成し、特定のバケットに対するトリガーを設定できます。

 

gcloud functions deploy YOUR_FUNCTION_NAME \
--runtime python39 \
--trigger-resource YOUR_BUCKET_NAME \
--trigger-event google.storage.object.finalize

このコマンドで、指定したバケット(YOUR_BUCKET_NAME)にファイルが追加された(finalizeされた)場合に、Cloud Function(YOUR_FUNCTION_NAME)がトリガーされるように設定されます。

 

上記の設定が完了すると、指定したGCSバケットに新しいオブジェクトが追加されるたびに、Cloud Functionが自動的に実行されます。このとき、関数の引数にはトリガーに関する情報(datacontext)が渡されます。これを使用して、特定の処理(例:BigQueryにデータを追加する等)を行うことができます。

 

サンプルコード

 

<requirement.txt>

pandas==1.4.1
google-cloud-storage==2.2.1
google-cloud-bigquery==3.11.4
<main.py>
from io import BytesIO
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery


def read_file(data, context):
    # クライアントをインスタンス化
    client = storage.Client()

    # バケットを取得
    bucket_name = data['bucket']
    bucket = client.get_bucket(bucket_name)


    # BLOB(Binary Large OBject)を構成
    file_name = data['name']
    #file_name = 'hello_world.csv'   #特定のファイル名を指定したい場合
    blob = bucket.blob(file_name)

    # オブジェクトのデータを取得
    content = blob.download_as_bytes()

    # バイナリオブジェクトに変換し、データフレームを作成
    df = pd.read_csv(BytesIO(content))
    print(df.head())

    # BigQueryクライアントのインスタンスを作成
    bqclient = bigquery.Client()

    rows_to_insert = [
            {"col1": '111', "col2": '222'},
        ]

    errors = bqclient.insert_rows_json('<データセットid>.<テーブル名>', rows_to_insert)  # Make an API request.
    if errors == []:
      print("New rows have been added.")
    else:
      print("Encountered errors while inserting rows: {}".format(errors))
また「NoSuchFieldException when trying to retrieve the value of field」というエラーが発生する場合は
挿入する列のデータ型と挿入しようとしているデータ型と違うことが原因なので型を確認してみてください


プログラミング・スクレイピングツール作成の相談を受け付けています!

クラウドワークス・ココナラ・MENTAなどでPython・SQL・GASなどのプログラミングに関する相談やツール作成などを承っております!

過去の案件事例:

  • Twitter・インスタグラムの自動化ツール作成
  • ウェブサイトのスクレイピングサポート
  • ダッシュボード・サイト作成
  • データエンジニア転職相談

これまでの案件例を見る

キャリア相談もお気軽に!文系学部卒からエンジニア・データサイエンティストへの転職経験をもとに、未経験者がどう進むべきかのアドバイスを提供します。


スポンサーリンク
/* プログラミング速報関連記事一覧表示 */
ミナピピンの研究室

コメント

タイトルとURLをコピーしました