SparkのDataFrameで作ったデータをPythonの各モジュールで使いたい時、pysparkのデータフレームをtoPandas()
メソッドを利用してPandasのデータフレームに変換したいことがあると思いますが、その際にメモリエラーでコードが実行できないことがあります。
<エラーメッセージ>
Caused by: org.apache.spark.sql.execution.OutOfMemorySparkException: Size of broadcasted table far exceeds estimates and exceeds limit of spark.driver.maxResultSize=4294967296.
今回はそういった場合の対処法についてまとめてみました
①メモリ上限を増やす
1つ目はsparkのドライバー設定を弄って、Pandasデータフレームで保持できるデータ量(メモリ)の上限を変更する
参考:【Pyspark】Spark.driverのメモリ上限設定を変更する方法
データ型を変更する
2つ目はバイト数を削減するように、変数のデータ型を変更する方法です。勿論、バイト数を削減することで、その後の演算の精度は損なわれます。また数字型にしか使えない&個人的に検証してみた感じ、大した節約にはならないのであまりオススメはしません
# int32型(4バイト)をint8型(1バイト)へ変換する dask_df = dask_dt.astype({k: 'int8' for k in dask_df.dtypes[dask_df.dtypes == 'int32'].index})
③ いったんテーブルに書き出す
DataBricks環境で個人的によく使うのはこれ、pysparkでjoinとかaggしたものをそのままtoPandas()するとメモリエラーがよく起こるので、個人的にはpysparkで集計した結果をテーブルに書き出して、それをspark.sql(‘SELECT * FROM <テーブル名>’).toPandas()すると解決したことが多かったです。
<イメージ例>
df.write.mode('Overwrite')saveAsTable('<table_name>') df = spark.sql('SELECT * FROM <tablr_name>')
コメント