業務でDatabricksとPysparkを触ったので忘れないようにメモしておきます。
Apache SparkとPySpark
Apache Spark
Apache Sparkとは、ビッグデータと機械学習のための非常に高速な分散処理フレームワークです。SparkはDatabricksの創業者たちによって開発されました。Databricksにおける分散処理はSparkによって行われます。
PySparkとは
PySparkとは、Apache SparkをPython上から実行するためのラッパーライブラリで、Apache SparkとPythonのコラボレーションをサポートするためにリリースされました。開発者はPySparkを用いることで、Pythonからデータフレームを操作する形でSparkを活用することができます。
両者の関係性としてはtwitterAPIとTweepyに近いと思います。
前準備
実行環境はGoogleColabratoryを想定しています。
Pysparkのインストール
Pysparkは以下のコマンドでインストールが可能です。
# Pysparkのインストール !pip install pyspark==3.3.0
インストールが完了したらライブラリを読み込んでバージョンを確認します
Pysparkのバージョン確認
import pyspark print(pyspark.__version__) <実行結果> 3.3.0
公式ドキュメント:
csv の読み込み
ライブラリのインストールしたらgoogleColabratoryのサンプルデータというディレクトリにデフォルトで用意されている「california_housing_test.csv」というcsvをPysparkでデータフレームとして読み込みます。
#csvファイル読み込み from pyspark.sql import SparkSession filename = '/content/sample_data/california_housing_test.csv' spark = SparkSession.builder \ .master("local") \ .appName("app") \ .getOrCreate() data = spark.read.csv(filename, header=True, inferSchema=True, sep=',') data.show()
<実行結果>
# データ型の確認 print(type(data))
<class ‘pyspark.sql.dataframe.DataFrame’> というデータ型でpandaのデータフレームとは異なる1データ型となっています。書き方もpandaのそれとは異なります。
テーブル内容の表示
項目 | コード |
---|---|
全件表示 | .show() |
10件表示 | .show(10) |
RDDで全件取得 | .collect() |
RDDで10件取得 | .take(10) |
RDDで10件取得 | .head(10) |
RDDで先頭1件取得 | .first() |
RDDで末尾1件取得 | .last() |
# 列データ(カラム名)の確認 print(data.columns) <実行結果> ['longitude', 'latitude', 'housing_median_age', 'total_rooms', 'total_bedrooms', 'population', 'households', 'median_income', 'median_house_value']
データのスキーマーの確認
print(data.printSchema())
データフレームの行数を取得
print(data.count())
pandasでよく行う基本処理(Pyspark版)
重複排除
data2 = data.distinct() data2.show()
特定のカラムのデータだけを抽出する
data2 = data.select("longitude","latitude") data2.show()
<実行結果>
データフレームの列名変更
Pysparkのデータフレームの列名を変更する際には.withColumnRenamed('変更前の列名', '変更後の列名')
を使用します。
data2 = data.withColumnRenamed('longitude', 'longitude_after') data2.show()
新規列を追加する
# 「new_col」という新規列を追加する data.withColumn("newcol", col('latitude')).show()
Group byでの集計
group byは.groupBy(col(‘まとめる基準の列名’)).agg({‘まとめたい対象列’: ‘まとめ方’})で行います。
纏め方には入るのは’count’や’mean’が当てはまります。
# housing_median_ageを基準にmedian_incomeの平均を集計してソートして表示 data.groupBy(col('housing_median_age')).agg({'median_income': 'mean'}).orderBy(col('housing_median_age')).show()
Group byで使える集計メソッド
項目 | コード |
---|---|
件数 | .count() |
統計値 | .describe(col(‘col_name’)) |
特定カラムの平均 | .groupBy().avg(‘col_name’) |
複数カラムの平均 | .groupBy().avg(‘col_name1’, ‘col_name2’) |
特定カラムの総和 | .groupBy().sum(‘col_name’) |
複数カラムの総和 | .groupBy().sum(‘col_name1’, ‘col_name2’) |
特定カラムの最大値 | .groupBy().max(‘col_name’) |
複数カラムの最大値 | .groupBy().max(‘col_name1’, ‘col_name2’) |
特定カラムの最小値 | .groupBy().min(‘col_name’) |
複数カラムの最小値 | .groupBy().min(‘col_name1’, ‘col_name2’) |
データ型の変換
from pyspark.sql.types import * data2.withColumn('latitude', col("latitude").cast(IntegerType())).show()
参照:https://spark.apache.org/docs/latest/sql-ref-datatypes.html
条件に一致するデータの抽出(Where文)
from pyspark.sql.functions import col # 条件文(列latitudeが37.37のデータを抽出する) data2 = data.filter(col('latitude') == 37.37) data2.show()
.filterで使える演算子一覧
項目 | コード |
---|---|
一致 | .filter(col(‘col_name’) == ‘A’)) |
AND(※括弧必須) | .filter((col(‘col_nameA’) == ‘A’) & (col(‘col_nameB’) == ‘B’)) |
OR(※括弧必須) | .filter((col(‘col_nameA’) == ‘A’) | (col(‘col_nameB’) == ‘B’)) |
文字列を含む | .filter(col(‘col_name’).contains(‘A’)) |
NULL | .filter(col(‘col_name’).isNull()) |
Not NULL | .filter(col(‘col_name’).isNotNull()) |
SQL形式パターンマッチ | .filter(col(‘col_name’).like(‘sql pattern match’)) |
正規表現パターンマッチ | .filter(col(‘col_name’).rlike(‘regex pattern’)) |
参照:https://qiita.com/wwacky/items/e687c0ef05ae7f1de980
WHENによる条件分析
from pyspark.sql.functions import when data2 = data.withColumn('latitude', when(col('latitude')>35, 'test1').when(col('latitude')<=35, 'test2').otherwise('unknown')) data2.show()
欠損値削除
# 欠損値を0埋めする(カラム名はsubsetで渡す) data2 = data.fillna(0, subset=['longitude', 'latitude']) data2.show()
ユーザー定義関数(udf)の実行
from pyspark.sql.types import IntegerType from pyspark.sql.functions import udf, struct # 一番シンプルな記法(udf関数で処理内容をラップする) plus_one = udf(lambda x: x + 1.0) data.withColumn('Result', plus_one('latitude')).show()
データ順のソート
data2 = data.orderBy(col("longitude").desc()) data2.show()
<実行結果>
Pandasのデータフレームに変換する
Pyspark→Pandasへの変換は.toPandas()
で行います。
df = data.toPandas() print(df.head())
sparkデータフレームをpandasの記法で操作できるようにする
またPyspark3.0からは慣れ親しんだpandasの記法でPysparkのデータフレームを取り扱うことが可能になっています。
import pyspark.pandas as ps psdf = data.to_pandas_on_spark() # pandas-on-Sparkデータフレーム print(type(psdf)) # pyspark.pandas.frame.DataFrame # pandasの記法でカラムにアクセス psdf['latitude']
window関数(分析関数)
分析に必須のwindow関数も書いておく。
この例は、window内の次のtimestampを新しい列として格納するための処理。
from pyspark.sql.window import Window # 長くなるので予めOVER句の中身を変数として切り出しておく window_schema = Window.partitionBy(col("uuid")).orderBy(col("timestamp").asc()) df = df.withColumn("next_timestamp", lead("timestamp", 1).over(window_schema))
日付時刻データ(timestamp)の処理
from pyspark.sql.functions import to_timestamp, from_utc_timestamp, date_format, from_unixtime # データ変換シリーズ # unixtime(e.g. 1552443740)をtimestamp型に変換 df = df.withColumn("timestamp", to_timestamp(from_unixtime(col("unix_timestamp")))) # timestampっぽい文字列をtimestampに変換 df = df.withColumn('unix_timestamp_utc', to_timestamp(col('time_str'), 'yyyy-MM-dd HH:mm:ss')) # UTCで格納されているtimestampをJSTに変換 df = df.withColumn('unix_timestamp_jst', from_utc_timestamp(col('unix_timestamp_utc'), "JST")) # timestampから特定の部分を取り出すシリーズ # 曜日("Mon","Tue"みたいに文字列で取り出すパターン) df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'EE')) # 曜日(1(月曜), 2(火曜),..., 7(日曜)みたいに整数で取り出すパターン) df = df.withColumn('dow', date_format(col('unix_timestamp_jst'), 'u')) # 時間(hour) df = df.withColumn('hour', date_format(col('unix_timestamp_jst'), 'HH'))
参照:https://qiita.com/paulxll/items/1c0833782cd4e1de86e2#dataframe%E3%81%AE%E3%83%91%E3%83%BC%E3%83%86%E3%82%A3%E3%82%B7%E3%83%A7%E3%83%B3%E5%91%A8%E3%82%8A%E3%81%AE%E6%93%8D%E4%BD%9C
データフレームの出力保存と読み込み
#データフレームの保存 data.write.mode("overwrite").save('/test_df') #保存したデータフレームの読み込み df_new = spark.read.load('/test_df') df_new.show()
SparkデータフレームをSQLのクエリで操作する
データフレームをテーブルあるいは一時ビューに登録することで、SQLを使用してデータを操作することができるようになります。テーブルは永続化されますが、一時ビューは永続化されず、クラスターが稼働している間のみ一時ビューを作成したセッションでのみ利用することができます。
# データフレームを一時ビューに登録 data.createOrReplaceTempView("test") # ビューをSQLで操作する test_df = spark.sql("""SELECT * FROM test """) test_df.show()
Pysparkが学べるudemyオススメ講座
参考:参考:https://qiita.com/taka_yayoi/items/a7ee6287031374efa88a
コメント
[…] 関連記事:Pyspark3.0用の集計・前処理サンプルコードまとめ […]
[…] 関連記事:Pyspark3.0用の集計・前処理サンプルコードまとめ […]