読者です 読者をやめる 読者になる 読者になる

About connecting the dots.

statistics/machine learning adversaria.

Spark2.0でジョブのアウトプットを高速にS3に書き出す

ここのところEMRでSparkを触ってます.まぁやってるのは,主にデータのparquet+snappyへの変換処理なんですけどね.EMRといえばHDFSではなく,EMRFS経由でS3に書き出すのがモダンなやり方,ということでそれを試してます.で,いろいろ試してて以下の2点の問題に気づきました.

  • S3に書き出す処理が遅い
  • 謎の _$folder$ というファイルができてしまう

今回はこれについて調べたことと,現状の対策法についてまとめておきます.検証環境はEMR Release 5.2.0で,Sparkバージョンは2.0.2になります.クラスタはマスターがm3.xlargeでスレーブがr3.2xlarge x 5台でした.

S3に書き出す処理が遅い

ジョブの挙動をみてると,Sparkジョブが終わっているはずなのに,結果ファイルがすべてでてきていない,という現象が起きていました.調べてみると,どうもデフォルト設定では,一旦出力をテンポラリファイルに書き出してから,最終的な出力先ディレクトリに再配置する,という挙動を取っているようです.これを回避するために,Spark2.0以前のバージョンでは,"spark.sql.parquet.output.committer.class" に "org.apache.spark.sql.parquet.DirectParquetOutputCommitter" を使用することで,書き出しを高速化することができました.このあたりについては,以下のエントリに詳しくまとまっています.

dev.sortable.com

ここでSpark2.0以前と書いたのには理由があって,2.0でこの DirectParquetOutputCommitter は削除されてしまったのです.理由は
SPARK-10063 に書かれていますが,要するにDirectParquetOutputCommitter は結果の整合性チェックをバイパスすることで高速な書き出しを行なっており,結果としてアウトプットが一部ロストする可能性がある,ということのようです.じゃあ対策はないのかというとそんなことはなくて,以下のドンピシャstackoverflowエントリをみつけました.

stackoverflow.com

この OutputCommitter のアルゴリズムってなんぞやというのは,以下の鯵坂さんの説明を読んでください.EMR5.2.0はHadoop2.7系なので,バージョンの2が選択可能というわけです.この記事だとリトライの際のロスが少なくなる,ということみたいですけど,たぶん同時に入ったMAPREDUCE-4815 のほうで,高速化が達成されていることだと思います.

qiita.com

ということで,結論としてはS3に高速に書き出すなら DirectParquetOutputCommitter を2にしましょう.サンプルコードは以下のようになります.Hiveテーブルからデータを読み込んで,parquet+snappyに変換してS3に保存するというやつです.ポイントは14行目,ここでバージョンの指定をしています.手元のデータで試してみたところ,gzip圧縮で10GB程度のファイルを変換するのに,何もしないと15分くらいかかってたのが,この指定で2分くらいになりました.実に7倍近い速度向上です*1

gist.github.com

謎の _$folder$ というファイルができてしまう

実はこっちがもともとのモチベーションだったんですが,parquet に変換して結果をS3に吐き出すと,必ずフォルダの横に,フォルダ名_$folder$ というファイルができてしまうのです.AWSのサイトにも害はないって書いてあるんですけど,普通に邪魔だし嫌じゃないですか.不要なのであれば消したいもんです.実は,これも以下の通り2.0以前であれば DirectParquetOutputCommitter を使えば普通に出力しないようにできたみたいです.

stackoverflow.com

ということで,当然2.0系ではこの方法は使えない,ということになります.そして調べた限り,これの出力を抑制する方法はみつかっていません.誰かご存知であれば,教えてください.

*1:もちろん,手元で適当にやった結果なので,ちゃんとしたベンチマークではない点に留意してください.