About connecting the dots.

data science related trivial things

PySpark で書き出しデータのパーティション数を調節する

小ネタなんですが,なかなかググっても見つからず,あれこれと試行錯誤してしまったので,メモがわりに.

要するに,gzip 圧縮してあるデータを読み出して,年月ごとにデータをパーティション分けして,結果を parquet 形式の 1 ファイルで書き出す,みたいな処理がしたいということです.結局 repartition() を使えばよかったので,以下のように yyyymm カラムを一時的に作って,パーティションを切りなおしてからそのカラムを落とすというテクを使いました.普通なら repartition(int) で直接パーティション数を指定すれば良いんでしょうけど,複数年月が分けられておらず固まったデータを一気に読み込んで,一気にパーティショニングしたいときには,こんな感じで無理やり動かすしかないのかなーという感じです.ちなみに 8-9 行目の処理は,S3 への書き出しを高速化するための設定になります*1

ちなみに repartition() ではなく coalesce() を使うやり方もありますが,こちらの記事によるとファイルサイズが均一にそろわないっぽいです.また今回のとは別で,データサイズをみて適切なファイルサイズごとにパーティションを分けたい,という場合であれば,以下の記事にあるように df.inputFiles.size で取得できるんですね.

stackoverflow.com

from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

in_path = "s3://XXX/gz/*.gz"
out_path = "s3://XXX/parquet/"

sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

originalDf = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("delimiter", "\t") \
    .load(in_path)
renamedDf = originalDf \
    .withColumnRenamed("_c0", "prod_id") \
    .withColumnRenamed("_c1", "cust_id") \
    .withColumnRenamed("_c2", "time_id") \
    .withColumnRenamed("_c3", "amount_sold")
yearAddedDf = renamedDf.withColumn("year", year(renamedDf.time_id))
monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id))
yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM'))

repartitionedDf = yyyymmAddedDf.repartition("yyyymm")
dropedDf = repartitionedDf.drop("yyyymm")

castedDf = dropedDf.withColumn("prod_id", dropedDf.prod_id.cast("decimal(38,0)")) \
    .withColumn("cust_id", dropedDf.cust_id.cast("decimal(38,0)")) \
    .withColumn("time_id", dropedDf.time_id.cast("timestamp")) \
    .withColumn("amount_sold", dropedDf.amount_sold.cast("decimal(38,2)")) \
    .withColumn("year", dropedDf.year.cast("int")) \
    .withColumn("month", dropedDf.month.cast("int"))
castedDf.write.partitionBy(["year", "month"]).mode("overwrite").parquet(out_path)

*1:このあたりは,昨年の記事にまとめてあります.