GCP

CloudFunctionからGCSのCSVを読み込んでBigQueryにテーブルを作成する

この記事は約6分で読めます。

 

この記事では、Google Cloud Functionを用いて、Google Cloud Storage (GCS)に保存されているCSVファイルをPythonのデータフレームとして読み込む方法について説明します。

 

前提条件

 

このチュートリアルを進める前に、Google Cloud Platform (GCP) にアカウントを作成し、Cloud FunctionとGoogle Cloud Storageを利用可能な状態にしておく必要があります。

 

必要なライブラリの依存関係

 

このチュートリアルではPythonのpandasライブラリを使用します。`requirements.txt`に以下のように記述してください。

 

<requirement.txt>

pandas==1.4.1
google-cloud-storage==2.2.1
google-cloud-bigquery==3.11.4
pandas-gbq==0.19.2

 

サンプルコード

 

以下は、Cloud FunctionでGCSからCSVファイルをデータフレームとして読み込むサンプルコードです。

 

from io import BytesIO
import pandas as pd
from google.cloud import storage
from google.cloud import bigquery

def read_file(data, context):
    # クライアントをインスタンス化
    client = storage.Client()

    # バケットを取得
    bucket_name = data['bucket']
    bucket = client.get_bucket(bucket_name)

    # BLOB(Binary Large OBject)を構成
    file_name = data['name']
    #file_name = 'hello_world.csv'   #特定のファイル名を指定したい場合
    blob = bucket.blob(file_name)

    # オブジェクトのデータを取得
    content = blob.download_as_bytes()

    # バイナリオブジェクトに変換し、データフレームを作成
    df = pd.read_csv(BytesIO(content))
    temp_cols = []
    for i in range(len(df.columns)):
        temp_cols.append('col' + str(i))
    df.columns = temp_cols
    # df.columns = ['a','b','c','d','e','f','g']
    print(df.head())

    # BQへデータをLoadする
    dataset_id = '〇〇〇.×××'  #データセット名を指定
    table_id = file_name[:-4] #テーブル名を指定
    try:
        df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='replace')
    except:
        df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='append')

 

コードの説明

 

このコードは、Google Cloud Storage (GCS) からCSVファイルを読み込んでデータフレームに変換し、その後Google BigQueryにそのデータをアップロードするPythonスクリプトです。このスクリプトは、Google Cloud Functionで実行されることを前提としています。

 

Cloud Functionの関数

def read_file(data, context):

この関数はCloud Functionとしてデプロイされる前提です。`data`と`context`はCloud Functionが自動的に提供する引数です。

 

GCSクライアントの設定

 

client = storage.Client()
bucket_name = data['bucket']
bucket = client.get_bucket(bucket_name)
file_name = data['name']
blob = bucket.blob(file_name)

 

Google Cloud Storageのバケットとファイル(Blob)にアクセスするための設定を行います。

 

データの読み込みとデータフレームの作成

 

content = blob.download_as_bytes()
df = pd.read_csv(BytesIO(content))

 

バケットからバイナリデータをダウンロードし、それをBytesIOを使ってPandasのDataFrameオブジェクトに変換します。

 

データフレームの列名の変更

 

 

temp_cols = []
for i in range(len(df.columns)):
    temp_cols.append('col' + str(i))
df.columns = temp_cols

 

DataFrameの各列に`col0`, `col1`, `col2`, … という名前を付けています。

 

BigQueryへのデータのアップロード

 

dataset_id = '〇〇〇.×××'
table_id = file_name[:-4]
try:
df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='replace')
except:
df.to_gbq('{}.{}'.format(dataset_id, table_id), if_exists='append')

 

この部分では、DataFrameをBigQueryにアップロードします。まず、データセットとテーブルのIDを指定します。`if_exists=’replace’`は、同名のテーブルが既に存在する場合にはそのテーブルを新しいデータで置き換えます。エラーが発生した場合には、`if_exists=’append’`でデータを既存のテーブルに追加します。

 

このスクリプトは全体的にかなり高度な処理をしており、Cloud StorageとBigQueryの両方と連携しています。一方で、エラーハンドリングやロギングが不足しているように見えますので、本番環境で使用する際にはそのような側面も考慮に入れることが重要です。

 

まとめ

 

以上が、Cloud FunctionとGoogle Cloud Storageを用いてCSVファイルをPythonのデータフレームとして読み込む基本的な手順です。この方法を用いれば、大量のデータに対しても簡単に処理を行うことができます。

 


プログラミング・スクレイピングツール作成の相談を受け付けています!

クラウドワークス・ココナラ・MENTAなどでPython・SQL・GASなどのプログラミングに関する相談やツール作成などを承っております!

過去の案件事例:

  • Twitter・インスタグラムの自動化ツール作成
  • ウェブサイトのスクレイピングサポート
  • ダッシュボード・サイト作成
  • データエンジニア転職相談

これまでの案件例を見る

キャリア相談もお気軽に!文系学部卒からエンジニア・データサイエンティストへの転職経験をもとに、未経験者がどう進むべきかのアドバイスを提供します。


スポンサーリンク
/* プログラミング速報関連記事一覧表示 */
ミナピピンの研究室

コメント

  1. […] また「NoSuchFieldException when trying to retrieve the value of field」というエラーが発生する場合は 挿入する列のデータ型と挿入しようとしているデータ型と違うことが原因なので型を確認してみてください 関連記事:CloudFunctionからGCSのCSVを読み込んでBigQueryにテーブルを作成する […]

タイトルとURLをコピーしました