pyspark

PySparkのtoPandas()でメモリエラーが起こるときの対策

この記事は約3分で読めます。

 

SparkのDataFrameで作ったデータをPythonの各モジュールで使いたい時、pysparkのデータフレームをtoPandas()メソッドを利用してPandasのデータフレームに変換したいことがあると思いますが、その際にメモリエラーでコードが実行できないことがあります。

 

<エラーメッセージ>

Caused by: org.apache.spark.sql.execution.OutOfMemorySparkException: Size of broadcasted table far exceeds estimates and exceeds limit of spark.driver.maxResultSize=4294967296.

 

Spark Driver Out of Memory Issue

 

今回はそういった場合の対処法についてまとめてみました

 

①メモリ上限を増やす

 

1つ目はsparkのドライバー設定を弄って、Pandasデータフレームで保持できるデータ量(メモリ)の上限を変更する

 

参考:【Pyspark】Spark.driverのメモリ上限設定を変更する方法

 

データ型を変更する

 

2つ目はバイト数を削減するように、変数のデータ型を変更する方法です。勿論、バイト数を削減することで、その後の演算の精度は損なわれます。また数字型にしか使えない&個人的に検証してみた感じ、大した節約にはならないのであまりオススメはしません

 

#  int32型(4バイト)をint8型(1バイト)へ変換する
dask_df = dask_dt.astype({k: 'int8' for k in dask_df.dtypes[dask_df.dtypes == 'int32'].index})

 

③ いったんテーブルに書き出す

 

DataBricks環境で個人的によく使うのはこれ、pysparkでjoinとかaggしたものをそのままtoPandas()するとメモリエラーがよく起こるので、個人的にはpysparkで集計した結果をテーブルに書き出して、それをspark.sql(‘SELECT * FROM <テーブル名>’).toPandas()すると解決したことが多かったです。

 

<イメージ例>

df.write.mode('Overwrite')saveAsTable('<table_name>')
df = spark.sql('SELECT * FROM <tablr_name>')

 

 

 

コメント

タイトルとURLをコピーしました