pyspark

PySparkのtoPandas()でメモリエラーが起こるときの対策

この記事は約3分で読めます。

 

SparkのDataFrameで作ったデータをPythonの各モジュールで使いたい時、pysparkのデータフレームをtoPandas()メソッドを利用してPandasのデータフレームに変換したいことがあると思いますが、その際にメモリエラーでコードが実行できないことがあります。

 

<エラーメッセージ>

Caused by: org.apache.spark.sql.execution.OutOfMemorySparkException: Size of broadcasted table far exceeds estimates and exceeds limit of spark.driver.maxResultSize=4294967296.

 

Spark Driver Out of Memory Issue
Hi, I am executing a simple job in Databricks for which I am getting below error. I increased the Driver size still I faced same issue. Spark config : from py...

 

今回はそういった場合の対処法についてまとめてみました

 

①メモリ上限を増やす

 

1つ目はsparkのドライバー設定を弄って、Pandasデータフレームで保持できるデータ量(メモリ)の上限を変更する

 

参考:【Pyspark】Spark.driverのメモリ上限設定を変更する方法

 

データ型を変更する

 

2つ目はバイト数を削減するように、変数のデータ型を変更する方法です。勿論、バイト数を削減することで、その後の演算の精度は損なわれます。また数字型にしか使えない&個人的に検証してみた感じ、大した節約にはならないのであまりオススメはしません

 

#  int32型(4バイト)をint8型(1バイト)へ変換する
dask_df = dask_dt.astype({k: 'int8' for k in dask_df.dtypes[dask_df.dtypes == 'int32'].index})

 

③ いったんテーブルに書き出す

 

DataBricks環境で個人的によく使うのはこれ、pysparkでjoinとかaggしたものをそのままtoPandas()するとメモリエラーがよく起こるので、個人的にはpysparkで集計した結果をテーブルに書き出して、それをspark.sql(‘SELECT * FROM <テーブル名>’).toPandas()すると解決したことが多かったです。

 

<イメージ例>

df.write.mode('Overwrite')saveAsTable('<table_name>')
df = spark.sql('SELECT * FROM <tablr_name>')

 

 

 


プログラミング・スクレイピングツール作成の相談を受け付けています!

クラウドワークス・ココナラ・MENTAなどでPython・SQL・GASなどのプログラミングに関する相談やツール作成などを承っております!

過去の案件事例:

  • Twitter・インスタグラムの自動化ツール作成
  • ウェブサイトのスクレイピングサポート
  • ダッシュボード・サイト作成
  • データエンジニア転職相談

これまでの案件例を見る

キャリア相談もお気軽に!文系学部卒からエンジニア・データサイエンティストへの転職経験をもとに、未経験者がどう進むべきかのアドバイスを提供します。


スポンサーリンク
/* プログラミング速報関連記事一覧表示 */
ミナピピンの研究室

コメント

タイトルとURLをコピーしました