Python

Pyspark3.0用の集計・前処理サンプルコードまとめ

この記事は約14分で読めます。

 

業務でDatabricksとPysparkを触ったので忘れないようにメモしておきます。

 

Apache SparkとPySpark

 

Apache Spark

 

Apache Sparkとは、ビッグデータと機械学習のための非常に高速な分散処理フレームワークです。SparkはDatabricksの創業者たちによって開発されました。Databricksにおける分散処理はSparkによって行われます。

 

PySparkとは

 

PySparkとは、Apache SparkをPython上から実行するためのラッパーライブラリで、Apache SparkとPythonのコラボレーションをサポートするためにリリースされました。開発者はPySparkを用いることで、Pythonからデータフレームを操作する形でSparkを活用することができます。

 

両者の関係性としてはtwitterAPIとTweepyに近いと思います。

 

前準備

 

実行環境はGoogleColabratoryを想定しています。

 

Pysparkのインストール

 

Pysparkは以下のコマンドでインストールが可能です。

 

# Pysparkのインストール
!pip install pyspark==3.3.0

 

インストールが完了したらライブラリを読み込んでバージョンを確認します

 

Pysparkのバージョン確認

 

import pyspark
print(pyspark.__version__)

<実行結果>
3.3.0
PYSPARKはバージョンで書き方がまちまちであり、本記事ではPYyspark version3.3.0を使用します。

公式ドキュメント:

pyspark.pandas.DataFrame.index — PySpark 3.2.0 documentation

 

 

csv の読み込み

 

ライブラリのインストールしたらgoogleColabratoryのサンプルデータというディレクトリにデフォルトで用意されている「california_housing_test.csv」というcsvをPysparkでデータフレームとして読み込みます。

 

#csvファイル読み込み

from pyspark.sql import SparkSession
filename = '/content/sample_data/california_housing_test.csv'
spark = SparkSession.builder \
.master("local") \
.appName("app") \
.getOrCreate()
data = spark.read.csv(filename, header=True, inferSchema=True, sep=',')
data.show()

 

<実行結果>

 

# データ型の確認
print(type(data))

 

<class ‘pyspark.sql.dataframe.DataFrame’> というデータ型でpandaのデータフレームとは異なる1データ型となっています。書き方もpandaのそれとは異なります。

 

テーブル内容の表示

 

項目 コード
全件表示 .show()
10件表示 .show(10)
RDDで全件取得 .collect()
RDDで10件取得 .take(10)
RDDで10件取得 .head(10)
RDDで先頭1件取得 .first()
RDDで末尾1件取得 .last()
# 列データ(カラム名)の確認

print(data.columns)

<実行結果>
['longitude',
'latitude',
'housing_median_age',
'total_rooms',
'total_bedrooms',
'population',
'households',
'median_income',
'median_house_value']

データのスキーマーの確認

print(data.printSchema())
<実行結果>

データフレームの行数を取得

print(data.count())
<実行結果>
3000

pandasでよく行う基本処理(Pyspark版)

重複排除

 

data2 = data.distinct()
data2.show()

特定のカラムのデータだけを抽出する

 

data2 = data.select("longitude","latitude")
data2.show()

 

 

<実行結果>

 

データフレームの列名変更

 

Pysparkのデータフレームの列名を変更する際には.withColumnRenamed('変更前の列名', '変更後の列名')を使用します。

 

data2 = data.withColumnRenamed('longitude', 'longitude_after')
data2.show()

 

<実行結果>

新規列を追加する

# 「new_col」という新規列を追加する
data.withColumn("newcol", col('latitude')).show()

 

<実行結果>

 

Group byでの集計

 

group byは.groupBy(col(‘まとめる基準の列名’)).agg({‘まとめたい対象列’: ‘まとめ方’})で行います。

纏め方には入るのは’count’や’mean’が当てはまります。

 

# housing_median_ageを基準にmedian_incomeの平均を集計してソートして表示
data.groupBy(col('housing_median_age')).agg({'median_income': 'mean'}).orderBy(col('housing_median_age')).show()

 

Group byで使える集計メソッド

 

項目 コード
件数 .count()
統計値 .describe(col(‘col_name’))
特定カラムの平均 .groupBy().avg(‘col_name’)
複数カラムの平均 .groupBy().avg(‘col_name1’, ‘col_name2’)
特定カラムの総和 .groupBy().sum(‘col_name’)
複数カラムの総和 .groupBy().sum(‘col_name1’, ‘col_name2’)
特定カラムの最大値 .groupBy().max(‘col_name’)
複数カラムの最大値 .groupBy().max(‘col_name1’, ‘col_name2’)
特定カラムの最小値 .groupBy().min(‘col_name’)
複数カラムの最小値 .groupBy().min(‘col_name1’, ‘col_name2’)

データ型の変換

データ型の変換にはcast()を使用します
from pyspark.sql.types import *
data2.withColumn('latitude', col("latitude").cast(IntegerType())).show()
主なデータ型

 

参照:https://spark.apache.org/docs/latest/sql-ref-datatypes.html

条件に一致するデータの抽出(Where文)

 

pysparkのデータフレームにおいて条件に一致するデータを抽出するためにはcol().filter()を使用します。
from pyspark.sql.functions import col
# 条件文(列latitudeが37.37のデータを抽出する)
data2 = data.filter(col('latitude') == 37.37)
data2.show()
<実行結果>

 

他にもPandasでのデータ抽出と同じようにAND OR などの複数条件や正規表現を使用した方法も存在
します。
.filterで使える演算子一覧

 

項目 コード
一致 .filter(col(‘col_name’) == ‘A’))
AND(※括弧必須) .filter((col(‘col_nameA’) == ‘A’) & (col(‘col_nameB’) == ‘B’))
OR(※括弧必須) .filter((col(‘col_nameA’) == ‘A’) | (col(‘col_nameB’) == ‘B’))
文字列を含む .filter(col(‘col_name’).contains(‘A’))
NULL .filter(col(‘col_name’).isNull())
Not NULL .filter(col(‘col_name’).isNotNull())
SQL形式パターンマッチ .filter(col(‘col_name’).like(‘sql pattern match’))
正規表現パターンマッチ .filter(col(‘col_name’).rlike(‘regex pattern’))

参照:https://qiita.com/wwacky/items/e687c0ef05ae7f1de980

 

WHENによる条件分析

 

from pyspark.sql.functions import when 
data2 = data.withColumn('latitude', when(col('latitude')>35, 'test1').when(col('latitude')<=35, 'test2').otherwise('unknown'))
data2.show()

 

欠損値削除

 

# 欠損値を0埋めする(カラム名はsubsetで渡す)
data2 = data.fillna(0, subset=['longitude', 'latitude']) 
data2.show()

 

 

ユーザー定義関数(udf)の実行

 

from pyspark.sql.types import IntegerType
from pyspark.sql.functions import udf, struct

# 一番シンプルな記法(udf関数で処理内容をラップする)
plus_one = udf(lambda x: x + 1.0)
data.withColumn('Result', plus_one('latitude')).show()
<実行結果>

データ順のソート

 

data2 = data.orderBy(col("longitude").desc())
data2.show()

 

<実行結果>

 

Pandasのデータフレームに変換する

 

Pyspark→Pandasへの変換は.toPandas()で行います。

 

df = data.toPandas()
print(df.head())

 

sparkデータフレームをpandasの記法で操作できるようにする

 

またPyspark3.0からは慣れ親しんだpandasの記法でPysparkのデータフレームを取り扱うことが可能になっています。

 

import pyspark.pandas as ps

psdf = data.to_pandas_on_spark()  # pandas-on-Sparkデータフレーム
print(type(psdf)) # pyspark.pandas.frame.DataFrame

# pandasの記法でカラムにアクセス
psdf['latitude']

 

window関数(分析関数)

 

分析に必須のwindow関数も書いておく。

この例は、window内の次のtimestampを新しい列として格納するための処理。

 

from pyspark.sql.window import Window

# 長くなるので予めOVER句の中身を変数として切り出しておく
window_schema = Window.partitionBy(col("uuid")).orderBy(col("timestamp").asc())

df = df.withColumn("next_timestamp", lead("timestamp", 1).over(window_schema))

 

日付時刻データ(timestamp)の処理

 



from pyspark.sql.functions import to_timestamp, from_utc_timestamp, date_format, from_unixtime

# データ変換シリーズ
# unixtime(e.g. 1552443740)をtimestamp型に変換
df = df.withColumn("timestamp", to_timestamp(from_unixtime(col("unix_timestamp"))))
# timestampっぽい文字列をtimestampに変換
df = df.withColumn('unix_timestamp_utc', to_timestamp(col('time_str'), 'yyyy-MM-dd HH:mm:ss'))
# UTCで格納されているtimestampをJSTに変換
df = df.withColumn('unix_timestamp_jst', from_utc_timestamp(col('unix_timestamp_utc'), "JST"))

# timestampから特定の部分を取り出すシリーズ
# 曜日("Mon","Tue"みたいに文字列で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'EE'))
# 曜日(1(月曜), 2(火曜),..., 7(日曜)みたいに整数で取り出すパターン)
df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'u'))
# 時間(hour)
df = df.withColumn('hour', date_format(col('unix_timestamp_jst'), 'HH'))

 

参照:https://qiita.com/paulxll/items/1c0833782cd4e1de86e2#dataframe%E3%81%AE%E3%83%91%E3%83%BC%E3%83%86%E3%82%A3%E3%82%B7%E3%83%A7%E3%83%B3%E5%91%A8%E3%82%8A%E3%81%AE%E6%93%8D%E4%BD%9C

 

データフレームの出力保存と読み込み

 

#データフレームの保存
data.write.mode("overwrite").save('/test_df')

#保存したデータフレームの読み込み
df_new = spark.read.load('/test_df')
df_new.show()

 

SparkデータフレームをSQLのクエリで操作する

 

データフレームをテーブルあるいは一時ビューに登録することで、SQLを使用してデータを操作することができるようになります。テーブルは永続化されますが、一時ビューは永続化されず、クラスターが稼働している間のみ一時ビューを作成したセッションでのみ利用することができます。

 

# データフレームを一時ビューに登録
data.createOrReplaceTempView("test")

# ビューをSQLで操作する
test_df = spark.sql("""SELECT * FROM test """)
test_df.show()

 

 

Pysparkが学べるudemyオススメ講座

 

https://www.udemy.com/course/pyspark-ml/learn/lecture/31023562#learning-tools

 

 

参考:参考:https://qiita.com/taka_yayoi/items/a7ee6287031374efa88a

 

コメント

  1. […] 関連記事:Pyspark3.0用の集計・前処理サンプルコードまとめ […]

タイトルとURLをコピーしました