この記事では、Google Cloud Functionを用いて、Google Cloud Storage (GCS)に保存されているCSVファイルをPythonのデータフレームとして読み込む方法について説明します。
Contents
前提条件
このチュートリアルを進める前に、Google Cloud Platform (GCP) にアカウントを作成し、Cloud FunctionとGoogle Cloud Storageを利用可能な状態にしておく必要があります。
必要なライブラリの依存関係
このチュートリアルではPythonのpandasライブラリを使用します。`requirements.txt`に以下のように記述してください。
<requirement.txt>
pandas==1.4.1 google-cloud-storage==2.2.1 google-cloud-bigquery==3.11.4 pandas-gbq==0.19.2
サンプルコード
以下は、Cloud FunctionでGCSからCSVファイルをデータフレームとして読み込むサンプルコードです。
from io import BytesIO import pandas as pd from google.cloud import storage from google.cloud import bigquery def read_file(data, context): # クライアントをインスタンス化 client = storage.Client() # バケットを取得 bucket_name = data['bucket'] bucket = client.get_bucket(bucket_name) # BLOB(Binary Large OBject)を構成 file_name = data['name'] #file_name = 'hello_world.csv' #特定のファイル名を指定したい場合 blob = bucket.blob(file_name) # オブジェクトのデータを取得 content = blob.download_as_bytes() # バイナリオブジェクトに変換し、データフレームを作成 df = pd.read_csv(BytesIO(content)) temp_cols = [] for i in range(len(df.columns)): temp_cols.append('col' + str(i)) df.columns = temp_cols # df.columns = ['a','b','c','d','e','f','g'] print(df.head()) # BQへデータをLoadする dataset_id = '〇〇〇.×××' #データセット名を指定 table_id = file_name[:-4] #テーブル名を指定 try: df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='replace') except: df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='append')
コードの説明
このコードは、Google Cloud Storage (GCS) からCSVファイルを読み込んでデータフレームに変換し、その後Google BigQueryにそのデータをアップロードするPythonスクリプトです。このスクリプトは、Google Cloud Functionで実行されることを前提としています。
Cloud Functionの関数
def read_file(data, context):
この関数はCloud Functionとしてデプロイされる前提です。`data`と`context`はCloud Functionが自動的に提供する引数です。
GCSクライアントの設定
client = storage.Client() bucket_name = data['bucket'] bucket = client.get_bucket(bucket_name) file_name = data['name'] blob = bucket.blob(file_name)
Google Cloud Storageのバケットとファイル(Blob)にアクセスするための設定を行います。
データの読み込みとデータフレームの作成
content = blob.download_as_bytes() df = pd.read_csv(BytesIO(content))
バケットからバイナリデータをダウンロードし、それをBytesIOを使ってPandasのDataFrameオブジェクトに変換します。
データフレームの列名の変更
temp_cols = [] for i in range(len(df.columns)): temp_cols.append('col' + str(i)) df.columns = temp_cols
DataFrameの各列に`col0`, `col1`, `col2`, … という名前を付けています。
BigQueryへのデータのアップロード
dataset_id = '〇〇〇.×××' table_id = file_name[:-4] try: df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='replace') except: df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='append')
この部分では、DataFrameをBigQueryにアップロードします。まず、データセットとテーブルのIDを指定します。`if_exists=’replace’`は、同名のテーブルが既に存在する場合にはそのテーブルを新しいデータで置き換えます。エラーが発生した場合には、`if_exists=’append’`でデータを既存のテーブルに追加します。
このスクリプトは全体的にかなり高度な処理をしており、Cloud StorageとBigQueryの両方と連携しています。一方で、エラーハンドリングやロギングが不足しているように見えますので、本番環境で使用する際にはそのような側面も考慮に入れることが重要です。
まとめ
以上が、Cloud FunctionとGoogle Cloud Storageを用いてCSVファイルをPythonのデータフレームとして読み込む基本的な手順です。この方法を用いれば、大量のデータに対しても簡単に処理を行うことができます。
コメント
[…] また「NoSuchFieldException when trying to retrieve the value of field」というエラーが発生する場合は 挿入する列のデータ型と挿入しようとしているデータ型と違うことが原因なので型を確認してみてください 関連記事:CloudFunctionからGCSのCSVを読み込んでBigQueryにテーブルを作成する […]