About connecting the dots.

data science related trivial things

書評: 人工知能システムのプロジェクトがわかる本

もうすっかり四半期に一回しかブログを書かないような感じになってますが,変わらずデータ分析業界の片隅で細々と生きております.さて,今回は著者の方より献本をいただいたので,書評を書きたいと思います*1

この手の機械学習*2そのものではなく,そのプロセスやシステムに焦点を当てた本はこれまでもありました.従来書だと,機械学習システムを内製するエンジニア向け,機械学習システムを発注する側向けがありますね.そしてついに,機械学習システムを受託で開発する側の本が出てきたので,これで足りないピースがそろった感があります (w いずれの本も機械学習という事象を正しく捉えていると思うんですが,視点を変えるとこうも内容が変わるもんだなぁと興味深く思います.

仕事ではじめる機械学習

仕事ではじめる機械学習

人工知能システムを外注する前に読む本~ディープラーニングビジネスのすべて~

人工知能システムを外注する前に読む本~ディープラーニングビジネスのすべて~

この本を読んで得られるもの

いわゆる SIer の人が,RFC・要件定義・PoC・開発・および保守運用といった通常のシステム開発フレームワークにのっとった形で,機械学習システムの作り方(や既存のシステムとの違い)を理解することができる

この本のスタンスは非常に明確で,SIer に勤務しており,これから機械学習システムの提案・受託を行おうとしている(もしくはすでに行なっている)人です.本編の流れはほぼひとつのケーススタディに沿っており,その中で実際に考慮すべき点,通常のシステムとの違いについてわかりやすく説明がなされています.機械学習システムは他のシステムと比べて,要件定義の難しさや手戻りの可能性,また保守運用の中での継続的なモデル改善の必要性といった点で,多くの不確実性を孕んでいます.そのため,こうした点に特別の注意を払って RFP は作られる必要があり,またそれに応じた提案を行う必要があります.私の個人的な経験でも,このあたりの難しさをわかりやすく説明しているものはなく,その点非常に多くの人に有益なように思います.

個人的にためになった点

私自身はいわゆる SIer に勤めた経験はなく,このようにカッチリしたシステム受託のプロセス周りの知見がそれほどありません.なので,SIer 的なプロセスを,機械学習システムという自分に馴染みのあるものを通して体感的に理解できたことはなかなか良いものでした.

また,個人的にクリーンヒットだったのは,「人工知能に期待しすぎる人に対する返し方」というコラムです.ちょっと長いですが引用すると,

人口知能に過度な期待を抱いている人と一緒にシステムの目的を協議する際には,彼らの意識を変えなければなりません...(中略)...利用者 「これくらいの精度はうちの○○さんでもできそうだ」→「入社直後の人が○○さんくらいの業務ができるようになるのはすばらしいことですよね...(中略)...利用者「検知したい不正の70%しか検知できてないじゃないか」→「人が検知したときには,これ以上の漏れがあると聞いていますので,人によるチェックも併せてやると,人の手間も減らしながら精度が上がりそうですよね」

私もいままで機械学習に詳しくない人とお話をする中で,どのように相手側の期待値を正しいところに落ち着かせ,目線を揃えていくかというところに結構神経を使うことがあります*3.こうした点も含めて,実体験からきている(であろう)こうした記述が,非常に参考になります.ネット上に溢れている言説の大半は,機械学習のエキスパート(大体の場合研究者か内製企業のエンジニアやデータサーエンティスト)か,機械学習ブームを煽るメディアサイ*4の言説で,このように地に足のついた言説をみる機会はほぼないように思います.

あと,業務システムに組み込むときの運用含めた注意点として,機械学習に詳しくない担当者が予測値をどう業務に生かすかを考えてシステムの設計がなされている点がとても興味深いです.新商品に対する想定や,異常値が出ることも想定して,予測値を人間がチェックしてから発注するといったプロセスのように,実際的な Tips が良いです.

その他

巻末に PFR および提案書,PoC のドキュメントや結果報告書等含まれており,こうしたものは実業務に携わる人向けでよいなーと,他人事ながら思います.あとは機械学習の詳細を捨てて,システム屋が機械学習システムについて知る,というスタンスに特化したのは良い判断だよなーという感じです.そういう本は山ほどありますし,今更そういうのはいらないと思うので.

ただ細かいところでいくつか気になるところもあって,例えば参考書籍が初級本と中・上級本に極端に別れているところは,なんでこんな風になったんだろうという感がします(プロジェクトマネジメントに関する本で,PMBOK ガイドと「マンガでわかるプロジェクトマネジメント」しかないとか).あとHadoop に関する脚注が「Apach Spark のこと(原文ママ)」になっているのは,普通におかしいので修正いただけることを望みます*5.また細かい点だと,機械学習に関する用語はできるだけ平易な言葉遣いをされているんですが,後半にいきなり汎化性能という言葉が説明なく出てきたり,ローパス・ハイパスフィルタが説明なく使われていたり*6,細かいところでは気になる点があるのですが,こちらも修正されると良いなと思っています.

*1:この場を借りて厚く御礼申し上げます.

*2:個人的なスタンスとして,人工知能という言葉をできるだけ使いたくないので,本のタイトル等を除いては,このエントリでも機械学習で統一しています.

*3:こうした方の中には,精度100%でないならお前らのものは使わん! といったことをおっしゃる方もいらっしゃいました.まぁ機械学習をご存知の方ならお分かりの通り,精度 100% が達成できるなら,それははじめからルールベースで記述でき,機械学習をそもそも使う必要がないわけです...

*4:ここには,実のところ SIer のセールストークも多分に含まれます.もちろん商売なのでそういう行為は仕方がないわけですが,過度に期待を煽っても,実際に案件を受けるときに苦労するのは現場の人たちなのになぁ,と思うことはよくあります.

*5:Apach ではなく Apache だし,Hadoop は Spark とは全く別の技術です.

*6:どちらも一般的なシステムエンジニアが知っているわけではないように思います.

livy に接続するために Sparkmagic をインストール際にハマったところ

久しぶりに書きますが,すごい小ネタ.EMR 5.9.0 で livy0.4.0 がサポートされたので,ちょっと試してみようかなと思ったわけです.EMR のよくある問題は,Step 経由でジョブを投げると,ジョブを並列実行できないところで,この解消のために ssh でログインしてコマンド実行するとか,そんな感じのやり方をとる必要がありました.ここに livy があれば,REST でジョブを投げられるので,いろいろ捗るかな,というのが背景にあります.

そんなわけで,chezou さんの記事を参考に sparkmagic を入れて試してみました.

chezou.hatenablog.com

そうしたら pykerberos のインストールでこけて,??? となったんですが,結果的には以下の issue で挙げられている,libkrb5-dev を先にインストールすることで,無事に sparkmagic が入りましたよ,というお話.

github.com

使ってみると,REST API も非常にシンプルで使いやすそう.Jupyter からも接続できていい感じです.ただ現在のところ,sparkmagic から EMR クラスタの Hive Metastore につながらくて,なんでなのかがよくわかってないです.多分自分の設定が何か間違ってるんでしょうけど... 接続できると,Glue Data Catalog とも連携できていい感じなのに.

PySpark で書き出しデータのパーティション数を調節する

小ネタなんですが,なかなかググっても見つからず,あれこれと試行錯誤してしまったので,メモがわりに.

要するに,gzip 圧縮してあるデータを読み出して,年月ごとにデータをパーティション分けして,結果を parquet 形式の 1 ファイルで書き出す,みたいな処理がしたいということです.結局 repartition() を使えばよかったので,以下のように yyyymm カラムを一時的に作って,パーティションを切りなおしてからそのカラムを落とすというテクを使いました.普通なら repartition(int) で直接パーティション数を指定すれば良いんでしょうけど,複数年月が分けられておらず固まったデータを一気に読み込んで,一気にパーティショニングしたいときには,こんな感じで無理やり動かすしかないのかなーという感じです.ちなみに 8-9 行目の処理は,S3 への書き出しを高速化するための設定になります*1

ちなみに repartition() ではなく coalesce() を使うやり方もありますが,こちらの記事によるとファイルサイズが均一にそろわないっぽいです.また今回のとは別で,データサイズをみて適切なファイルサイズごとにパーティションを分けたい,という場合であれば,以下の記事にあるように df.inputFiles.size で取得できるんですね.

stackoverflow.com

from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

in_path = "s3://XXX/gz/*.gz"
out_path = "s3://XXX/parquet/"

sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

originalDf = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("delimiter", "\t") \
    .load(in_path)
renamedDf = originalDf \
    .withColumnRenamed("_c0", "prod_id") \
    .withColumnRenamed("_c1", "cust_id") \
    .withColumnRenamed("_c2", "time_id") \
    .withColumnRenamed("_c3", "amount_sold")
yearAddedDf = renamedDf.withColumn("year", year(renamedDf.time_id))
monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id))
yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM'))

repartitionedDf = yyyymmAddedDf.repartition("yyyymm")
dropedDf = repartitionedDf.drop("yyyymm")

castedDf = dropedDf.withColumn("prod_id", dropedDf.prod_id.cast("decimal(38,0)")) \
    .withColumn("cust_id", dropedDf.cust_id.cast("decimal(38,0)")) \
    .withColumn("time_id", dropedDf.time_id.cast("timestamp")) \
    .withColumn("amount_sold", dropedDf.amount_sold.cast("decimal(38,2)")) \
    .withColumn("year", dropedDf.year.cast("int")) \
    .withColumn("month", dropedDf.month.cast("int"))
castedDf.write.partitionBy(["year", "month"]).mode("overwrite").parquet(out_path)

*1:このあたりは,昨年の記事にまとめてあります.

AWS の Deep Learning AMI で MXNet のバージョンをあげる

最近 AWSDeep Learning 周りのモデルを試してみたりしています.Deep Learning AMI があるので,自分で CUDA や cuDNN や各種フレームワークを入れる必要がないというのが一番大きいです*1.10 分もあれば,P2 インスタンス上で Jupyter notebook 使って開発が開始できるのはとても楽です.もうローカルマシンに環境構築する時代には戻りたくないものです.

とはいえ,この界隈は非常に進歩が早くていろいろ追いついていないことが多いです.現在の Deep Learning AMI では MXNet の 0.9.3 がインストールされているのですが,MXNet 自体はすでに 0.10.0 になっており,ドキュメントもそれに合わせてバージョンアップしています.このバージョンアップで結構 API が変わっており*2,ドキュメントみながらいろいろ触ってても,エラーが出て結構詰まってしまうのですね...

なので MXNet のバージョンをあげましたよというのが今回のお話.やることはそんなに多くなくて,コマンドを以下のように順に打っていけばよいです.今後も同じようなことがあったときのための備忘録がわりに*3

cd ~/src/mxnet
git pull origin master
git submodule update
sudo make clean && make # 2h くらいかかります
python -c "import mxnet; print(mxnet.__veriosn__)" # 0.9.3 と表示されるはず
cd python
sudo python setup.py install
python -c "import mxnet; print(mxnet.__veriosn__)" # 0.10.0 と表示されればOK

*1:Amazon Linux だけではなく,Ubuntu の AMI もあります.

*2:mod や test_util あたりが特に変わっている感じがします.

*3:こちらとかこちらを参考にしました.

RStudio + sparklyr on EMRでスケーラブル機械学習

前回に引き続き分析環境ネタ第2弾*1.今回はEMRでRStudioを立ててみます.

RStudioの構築

やり方自体は,AWS Big Data Blogにまとまっているので,別に難しくはなかったり.RStudioとか関連コンポーネントは,例のごとくS3に便利スクリプトがあるので,これをブートストラップで読み込んであげればOK.

s3://aws-bigdata-blog/artifacts/aws-blog-emr-rstudio-sparklyr/rstudio_sparklyr_emr5.sh --sparklyr --rstudio --sparkr --rexamples --plyrmr

あとはSSHトンネルを掘って,RStudioのWebページにつないであげればOK.8787ポートでつながります.

sparklyrでSparkと連携

sparklyrは,RStudioの人たちが作っている,RからSparkを扱うためのパッケージになります.ちょうどタイムリーにyamano357さんが,sparklyrの記事をあげていたりしますね*2

qiita.com

基本的にリファレンスページが非常にまとまっているので,それを読むだけで一通りの操作ができますが,とりあえず自分でも試してみましょう.sparklyrは read_from_csv() を使ってS3から直接ファイルを読み込むことができます.便利ですね.試してはいないのですが,以下のパッケージを併せて使えば,RでRedshiftからもSparkにデータを読み出せるっぽいです.

github.com

あとは,リファレンスに従って,定番処理を試してみましょう.読み込むデータは前回と一緒のabaloneです.

library(sparklyr)
library(dplyr)

# create Spark Context
sc <- spark_connect(master = "yarn-client", version = "2.0.2")

# create dataset in Spark
abalone <- spark_read_csv(sc, "abalone", path="s3://XXXXXXXX/abalone.csv")
names(abalone) = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height", "rings")
head(abalone)
tbl_cache(sc, "abalone")

# execute query
abalone %>%
  dplyr::group_by(sex) %>%
  dplyr::count()

# separate dataset
partitions <- abalone %>%
  sdf_partition(training = 0.9, test = 0.1, seed = 123)

# execute regression model
fit <- partitions$training %>%
  ml_linear_regression(
    response = "rings",
    features = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height")
  )
summary(fit)
original_value <- partitions$test %>%
    dplyr::select(rings)
predicted_value <- predict(fit, partitions$test)

ということで,慣れると非常に簡単に処理できますね.すばらしい.Sparklyrはチートシートもあるので,慣れるのも割とすぐな感じです.これをみる感じ,ビジュアライズはさすがにRのメモリ側にデータを持ってこないとダメみたいですね.あとは,ML Pipelineの各Transformerに対応するメソッドが揃っていたり,かなり力入れて開発されてるなぁ感がすごいですね.

所感

と,ささっと触って思ったのは,同じことやるならZeppelinで直接DataFrameいじったほうが早い気がするなぁということでした.結局sparklyrでできるのは,RのインタフェースでML Pipelineを扱えるというだけのことで,ロジック自体は実質的にML Pipelineを知らないと書けないし,結果の可視化もZeppelinで十分に対応可能です.

その上で,じゃあどういう使い方をすると良いかなぁと考えると,やっぱりRを使うメリットは高速なプロトタイピングってところに尽きるのかなぁと思ったり.つまり,大規模なデータを前処理して,Rの機械学習モデルに落とし込んでいろいろ試行錯誤する,という使い方かなぁと.なので,有賀さんが前にibisの話ししてたのと同じイメージで使うと,色々捗りそうな気がしています.

www.slideshare.net

2017/1/31追記

sparklyr0.5に合わせて,公式ページに Deploying sparklyr という形で基本的なセットアップと使い方がまとめられてます.そのサンプル読んでると,実は ml_linear_regression() とか x ~ y のR式の記述ができたということに気がつきました.便利だ...

*1:第3弾があるかは謎

*2:Athenaが東京リージョンにくるの待たなくても,オレゴンで立ち上げて,東京リージョンのS3バケットにクエリしちゃえばいいのに,とか思ったりしますが

Spark on EMRでZeppelinを使ってML Pipelineを試してみる

Sparkの最新状況をアップデートする意味も含めて,EMRで一通りの挙動を試してみたので,備忘録的にまとめておきます.慣れると簡単で便利なんですけど,それまでは結構ハマりどころが多いんですよねぇ,このあたり.

Zeppelinにアクセスするまで

AWS Big Dataブログにまとまっている通り,EMRに便利コンポーネントをいろいろ含めて起動するのは簡単です.AWSが用意しているスクリプトをbootstrapに指定して,必要なコンポーネントをオプションで引き渡してあげれば,RとかPythonとかの必須パッケージを含んだ形でEMRを起動できます*1

ちなみに,EMRの起動自体は cli でも実施できます.軽くサンプルを作ってみましたがブートストラップアクションとかインストールコンポーネントとかは適当に変えられますので変えてみてください*2

起動が終わったら,EMRクラスタGUIにアクセスしたいので,SSHトンネルを掘ります.この辺りがEMRの不便なところではあるんですよね... 手順についてはクラスメソッドさんの記事によくまとまっているかと思います.コマンド的には以下のような感じで.

ssh -i ~/.ssh/XXXXXXXX.pem -N -D 8157 hadoop@ec2-XXXXXXXX.compute-1.amazonaws.com

dev.classmethod.jp

ここまで終わったら,ようやっとZeppelinにアクセスできるようになります.こんな感じ.

f:id:SAM:20170122114710p:plain

Redshiftにデータセットをロード

今回使ったデータセットこちらUCIのMachine Learning DatasetからAbaloneを使わせてもらいました.これは4177行x8列のデータセットで,アワビの大きさや重さから,年齢を当てるという,回帰分析用のデータセットになります.CSVから読み込んであげてもいいんですけれども,せっかくなので今回はRedshiftからデータを読み込んでみます.こちらもAWS Big Dataブログにやり方がまとまっています.

まずはS3にアップロードしますけど,これはcliでちょちょっとやればいいだけです.XXXXXXXXになってるところはバケットネームなので,自身のものを入れてください.

aws s3 cp ~/Downloads/abalone.data.txt s3://XXXXXXXX/abalone.csv

続いて,Redshift側ではDDLでテーブル作ってあげて,COPYコマンドでロードします.コマンドは以下の通り*3

create table abalone (sex varchar(2), length real, diameter real, height real, whole_weight real, shucked_weight real, viscera_weight real, shell_weight real, rings integer);
copy abalone from 's3://XXXXXXXX/abalone.csv' credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXX';

ZeppelinからRedshiftにアクセス

ZeppelinからRedshiftにアクセスするためには,いくつかjarをパスに追加してあげる必要があります.このあたりの手順は公式ドキュメントにまとまっているので,参考にして順に以下のjarを追加していきます.

  • /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
  • minimal-json-0.9.4.jar
  • spark-redshift_2.10-2.0.0.jar
  • spark-avro_2.11-3.0.0.jar

一番上のJDBCは最初からクラスタにあるので,そのままパスを追加してあげればOKです.2番目のjsonは,Maven Centralのレポジトリを追加してあげればOK.ですが3,4番目のjarはMaven Centralにあるはずなのですがなぜか追加してもジョブ実行時にエラーが出てしまいます.仕方がないので,直接jarをダウンロードして適当な場所に配置してあげます.最終的に以下のようにDependenciesが設定できました.

f:id:SAM:20170122120334p:plain

ML Pipelineの実行

ML Pipelineはパイプラインのようにパラメタ変換処理をつなげて,機械学習モデルの処理を記述するものです.ML Pipelineには大きく分けてTransformerとEstimatorの2つのコンポーネントがあります.それぞれの引数と戻り値の型は以下の通り.

  • val df2 = new Transformer().transform(df1)
  • val model = new Estimator().fit(df2)

Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.

MLモデルの例

では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)
  .setElasticNetParam(0.1)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
test.printSchema()

実行すると,以下のように結果が得られます.

predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]
([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0
root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...|       1.0|
+-----+--------------+--------------------+--------------------+----------+

Pipelineを使ったCV

次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.

f:id:SAM:20170123133755p:plain

実行コードはこちら.

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql._

import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;

// Provide the jdbc url for Amazon Redshift
val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX"

// Create and declare an S3 bucket where the temporary files are written
val s3TempDir = "s3://XXXXXXXX/"
val abaloneQuery = """
                    select * from abalone
                   """
// Create a Dataframe to hold the results of the above query
val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift")
                                          .option("url", jdbcURL)
                                          .option("tempdir", s3TempDir)
                                          .option("query", abaloneQuery)
                                          .option("temporary_aws_access_key_id", awsAccessKey)
                                          .option("temporary_aws_secret_access_key", awsSecretKey)
                                          .option("temporary_aws_session_token", token).load()  

// create pipeline
val stringIndexer = new StringIndexer()
    .setInputCol("sex")
    .setOutputCol("sex_index")
val vectorAsembler = new VectorAssembler()
    .setInputCols(Array("sex_index", "length", "diameter", "height",
        "whole_weight", "shucked_weight", "viscera_weight", "shell_weight"))
    .setOutputCol("features")
val linearRegression = new LinearRegression()
    .setLabelCol("rings")
    .setMaxIter(1)
    .setRegParam(0.3)
    .setElasticNetParam(0.5)
val paramGrid = new ParamGridBuilder()
    .addGrid(linearRegression.regParam, Array(0.1, 0.3))
    .addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9))
    .build()
val crossvalidator = new CrossValidator()
  .setEstimator(linearRegression)
  .setEvaluator(new RegressionEvaluator().setLabelCol("rings"))
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)
val pipeline = new Pipeline()
    .setStages(Array(stringIndexer, vectorAsembler, crossvalidator))
    
// fit model
val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234)
val model = pipeline.fit(training)
val predictions = model.transform(test)

// visualize the result
predictions.registerTempTable("predictions")
predictions.stat.corr("rings", "prediction")
||< 

* まとめ

*1:今回の実施環境は,EMR Release 5.2.1 で Spark version は 2.0.2でした.

*2:今回のお試しプロセスは,マネジメントコンソールから作っちゃったので,gistサンプルとは構成が異なります.

*3:クレデンシャルとかS3バケットとかは,全部XXXXXXXXでマスクしてあるので,自分の環境に合わせて適宜入れ替えてください

Spark2.0でジョブのアウトプットを高速にS3に書き出す

ここのところEMRでSparkを触ってます.まぁやってるのは,主にデータのparquet+snappyへの変換処理なんですけどね.EMRといえばHDFSではなく,EMRFS経由でS3に書き出すのがモダンなやり方,ということでそれを試してます.で,いろいろ試してて以下の2点の問題に気づきました.

  • S3に書き出す処理が遅い
  • 謎の _$folder$ というファイルができてしまう

今回はこれについて調べたことと,現状の対策法についてまとめておきます.検証環境はEMR Release 5.2.0で,Sparkバージョンは2.0.2になります.クラスタはマスターがm3.xlargeでスレーブがr3.2xlarge x 5台でした.

S3に書き出す処理が遅い

ジョブの挙動をみてると,Sparkジョブが終わっているはずなのに,結果ファイルがすべてでてきていない,という現象が起きていました.調べてみると,どうもデフォルト設定では,一旦出力をテンポラリファイルに書き出してから,最終的な出力先ディレクトリに再配置する,という挙動を取っているようです.これを回避するために,Spark2.0以前のバージョンでは,"spark.sql.parquet.output.committer.class" に "org.apache.spark.sql.parquet.DirectParquetOutputCommitter" を使用することで,書き出しを高速化することができました.このあたりについては,以下のエントリに詳しくまとまっています.

dev.sortable.com

ここでSpark2.0以前と書いたのには理由があって,2.0でこの DirectParquetOutputCommitter は削除されてしまったのです.理由は
SPARK-10063 に書かれていますが,要するにDirectParquetOutputCommitter は結果の整合性チェックをバイパスすることで高速な書き出しを行なっており,結果としてアウトプットが一部ロストする可能性がある,ということのようです.じゃあ対策はないのかというとそんなことはなくて,以下のドンピシャstackoverflowエントリをみつけました.

stackoverflow.com

この OutputCommitter のアルゴリズムってなんぞやというのは,以下の鯵坂さんの説明を読んでください.EMR5.2.0はHadoop2.7系なので,バージョンの2が選択可能というわけです.この記事だとリトライの際のロスが少なくなる,ということみたいですけど,たぶん同時に入ったMAPREDUCE-4815 のほうで,高速化が達成されていることだと思います.

qiita.com

ということで,結論としてはS3に高速に書き出すなら DirectParquetOutputCommitter を2にしましょう.サンプルコードは以下のようになります.Hiveテーブルからデータを読み込んで,parquet+snappyに変換してS3に保存するというやつです.ポイントは14行目,ここでバージョンの指定をしています.手元のデータで試してみたところ,gzip圧縮で10GB程度のファイルを変換するのに,何もしないと15分くらいかかってたのが,この指定で2分くらいになりました.実に7倍近い速度向上です*1

gist.github.com

謎の _$folder$ というファイルができてしまう

実はこっちがもともとのモチベーションだったんですが,parquet に変換して結果をS3に吐き出すと,必ずフォルダの横に,フォルダ名_$folder$ というファイルができてしまうのです.AWSのサイトにも害はないって書いてあるんですけど,普通に邪魔だし嫌じゃないですか.不要なのであれば消したいもんです.実は,これも以下の通り2.0以前であれば DirectParquetOutputCommitter を使えば普通に出力しないようにできたみたいです.

stackoverflow.com

ということで,当然2.0系ではこの方法は使えない,ということになります.そして調べた限り,これの出力を抑制する方法はみつかっていません.誰かご存知であれば,教えてください.

*1:もちろん,手元で適当にやった結果なので,ちゃんとしたベンチマークではない点に留意してください.