PySpark で書き出しデータのパーティション数を調節する
小ネタなんですが,なかなかググっても見つからず,あれこれと試行錯誤してしまったので,メモがわりに.
要するに,gzip 圧縮してあるデータを読み出して,年月ごとにデータをパーティション分けして,結果を parquet 形式の 1 ファイルで書き出す,みたいな処理がしたいということです.結局 repartition() を使えばよかったので,以下のように yyyymm カラムを一時的に作って,パーティションを切りなおしてからそのカラムを落とすというテクを使いました.普通なら repartition(int) で直接パーティション数を指定すれば良いんでしょうけど,複数年月が分けられておらず固まったデータを一気に読み込んで,一気にパーティショニングしたいときには,こんな感じで無理やり動かすしかないのかなーという感じです.ちなみに 8-9 行目の処理は,S3 への書き出しを高速化するための設定になります*1.
ちなみに repartition() ではなく coalesce() を使うやり方もありますが,こちらの記事によるとファイルサイズが均一にそろわないっぽいです.また今回のとは別で,データサイズをみて適切なファイルサイズごとにパーティションを分けたい,という場合であれば,以下の記事にあるように df.inputFiles.size で取得できるんですね.
from pyspark.context import SparkContext from pyspark.sql import SQLContext from pyspark.sql.functions import year, month, date_format in_path = "s3://XXX/gz/*.gz" out_path = "s3://XXX/parquet/" sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2") sc._jsc.hadoopConfiguration().set("spark.speculation", "false") sqlContext = SQLContext(sc) sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy") originalDf = sqlContext.read \ .format("com.databricks.spark.csv") \ .option("delimiter", "\t") \ .load(in_path) renamedDf = originalDf \ .withColumnRenamed("_c0", "prod_id") \ .withColumnRenamed("_c1", "cust_id") \ .withColumnRenamed("_c2", "time_id") \ .withColumnRenamed("_c3", "amount_sold") yearAddedDf = renamedDf.withColumn("year", year(renamedDf.time_id)) monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id)) yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM')) repartitionedDf = yyyymmAddedDf.repartition("yyyymm") dropedDf = repartitionedDf.drop("yyyymm") castedDf = dropedDf.withColumn("prod_id", dropedDf.prod_id.cast("decimal(38,0)")) \ .withColumn("cust_id", dropedDf.cust_id.cast("decimal(38,0)")) \ .withColumn("time_id", dropedDf.time_id.cast("timestamp")) \ .withColumn("amount_sold", dropedDf.amount_sold.cast("decimal(38,2)")) \ .withColumn("year", dropedDf.year.cast("int")) \ .withColumn("month", dropedDf.month.cast("int")) castedDf.write.partitionBy(["year", "month"]).mode("overwrite").parquet(out_path)