Spark on EMRでZeppelinを使ってML Pipelineを試してみる
Sparkの最新状況をアップデートする意味も含めて,EMRで一通りの挙動を試してみたので,備忘録的にまとめておきます.慣れると簡単で便利なんですけど,それまでは結構ハマりどころが多いんですよねぇ,このあたり.
Zeppelinにアクセスするまで
AWS Big Dataブログにまとまっている通り,EMRに便利コンポーネントをいろいろ含めて起動するのは簡単です.AWSが用意しているスクリプトをbootstrapに指定して,必要なコンポーネントをオプションで引き渡してあげれば,RとかPythonとかの必須パッケージを含んだ形でEMRを起動できます*1.
ちなみに,EMRの起動自体は cli でも実施できます.軽くサンプルを作ってみましたがブートストラップアクションとかインストールコンポーネントとかは適当に変えられますので変えてみてください*2.
起動が終わったら,EMRクラスタのGUIにアクセスしたいので,SSHトンネルを掘ります.この辺りがEMRの不便なところではあるんですよね... 手順についてはクラスメソッドさんの記事によくまとまっているかと思います.コマンド的には以下のような感じで.
ssh -i ~/.ssh/XXXXXXXX.pem -N -D 8157 hadoop@ec2-XXXXXXXX.compute-1.amazonaws.com
ここまで終わったら,ようやっとZeppelinにアクセスできるようになります.こんな感じ.
Redshiftにデータセットをロード
今回使ったデータセットはこちら. UCIのMachine Learning DatasetからAbaloneを使わせてもらいました.これは4177行x8列のデータセットで,アワビの大きさや重さから,年齢を当てるという,回帰分析用のデータセットになります.CSVから読み込んであげてもいいんですけれども,せっかくなので今回はRedshiftからデータを読み込んでみます.こちらもAWS Big Dataブログにやり方がまとまっています.
まずはS3にアップロードしますけど,これはcliでちょちょっとやればいいだけです.XXXXXXXXになってるところはバケットネームなので,自身のものを入れてください.
aws s3 cp ~/Downloads/abalone.data.txt s3://XXXXXXXX/abalone.csv
続いて,Redshift側ではDDLでテーブル作ってあげて,COPYコマンドでロードします.コマンドは以下の通り*3.
create table abalone (sex varchar(2), length real, diameter real, height real, whole_weight real, shucked_weight real, viscera_weight real, shell_weight real, rings integer); copy abalone from 's3://XXXXXXXX/abalone.csv' credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXX';
ZeppelinからRedshiftにアクセス
ZeppelinからRedshiftにアクセスするためには,いくつかjarをパスに追加してあげる必要があります.このあたりの手順は公式ドキュメントにまとまっているので,参考にして順に以下のjarを追加していきます.
- /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
- minimal-json-0.9.4.jar
- spark-redshift_2.10-2.0.0.jar
- spark-avro_2.11-3.0.0.jar
一番上のJDBCは最初からクラスタにあるので,そのままパスを追加してあげればOKです.2番目のjsonは,Maven Centralのレポジトリを追加してあげればOK.ですが3,4番目のjarはMaven Centralにあるはずなのですがなぜか追加してもジョブ実行時にエラーが出てしまいます.仕方がないので,直接jarをダウンロードして適当な場所に配置してあげます.最終的に以下のようにDependenciesが設定できました.
ML Pipelineの実行
ML Pipelineはパイプラインのようにパラメタ変換処理をつなげて,機械学習モデルの処理を記述するものです.ML Pipelineには大きく分けてTransformerとEstimatorの2つのコンポーネントがあります.それぞれの引数と戻り値の型は以下の通り.
- val df2 = new Transformer().transform(df1)
- val model = new Estimator().fit(df2)
Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.
MLモデルの例
では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.
import org.apache.spark.ml.classification.LogisticRegression import org.apache.spark.ml.linalg.{Vector, Vectors} import org.apache.spark.ml.param.ParamMap import org.apache.spark.sql.Row // Prepare training data from a list of (label, features) tuples. val training = spark.createDataFrame(Seq( (1.0, Vectors.dense(0.0, 1.1, 0.1)), (0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(2.0, 1.3, 1.0)), (1.0, Vectors.dense(0.0, 1.2, -0.5)) )).toDF("label", "features") val lr = new LogisticRegression() // Print out the parameters, documentation, and any default values. // We may set parameters using setter methods. lr.setMaxIter(10) .setRegParam(0.01) .setElasticNetParam(0.1) // Learn a LogisticRegression model. This uses the parameters stored in lr. val model1 = lr.fit(training) // Prepare test data. val test = spark.createDataFrame(Seq( (1.0, Vectors.dense(-1.0, 1.5, 1.3)), (0.0, Vectors.dense(3.0, 2.0, -0.1)), (1.0, Vectors.dense(0.0, 2.2, -1.5)) )).toDF("label", "features") test.printSchema()
実行すると,以下のように結果が得られます.
predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields] ([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0 ([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0 ([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0 root |-- label: double (nullable = false) |-- features: vector (nullable = true) |-- rawPrediction: vector (nullable = true) |-- probability: vector (nullable = true) |-- prediction: double (nullable = true) +-----+--------------+--------------------+--------------------+----------+ |label| features| rawPrediction| probability|prediction| +-----+--------------+--------------------+--------------------+----------+ | 1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...| 1.0| | 0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...| 0.0| | 1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...| 1.0| +-----+--------------+--------------------+--------------------+----------+
Pipelineを使ったCV
次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.
実行コードはこちら.
import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.regression.LinearRegression import org.apache.spark.ml.evaluation.RegressionEvaluator import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler} import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.mllib.stat.Statistics import org.apache.spark.sql._ import com.amazonaws.auth._ import com.amazonaws.auth.AWSCredentialsProvider import com.amazonaws.auth.AWSSessionCredentials import com.amazonaws.auth.InstanceProfileCredentialsProvider import com.amazonaws.services.redshift.AmazonRedshiftClient import _root_.com.amazon.redshift.jdbc41.Driver import org.apache.spark.sql.SaveMode import org.apache.spark.sql.SQLContext // Instance Profile for authentication to AWS resources val provider = new InstanceProfileCredentialsProvider(); val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials]; val token = credentials.getSessionToken; val awsAccessKey = credentials.getAWSAccessKeyId; val awsSecretKey = credentials.getAWSSecretKey val sqlContext = new SQLContext(sc) import sqlContext.implicits._; // Provide the jdbc url for Amazon Redshift val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX" // Create and declare an S3 bucket where the temporary files are written val s3TempDir = "s3://XXXXXXXX/" val abaloneQuery = """ select * from abalone """ // Create a Dataframe to hold the results of the above query val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift") .option("url", jdbcURL) .option("tempdir", s3TempDir) .option("query", abaloneQuery) .option("temporary_aws_access_key_id", awsAccessKey) .option("temporary_aws_secret_access_key", awsSecretKey) .option("temporary_aws_session_token", token).load() // create pipeline val stringIndexer = new StringIndexer() .setInputCol("sex") .setOutputCol("sex_index") val vectorAsembler = new VectorAssembler() .setInputCols(Array("sex_index", "length", "diameter", "height", "whole_weight", "shucked_weight", "viscera_weight", "shell_weight")) .setOutputCol("features") val linearRegression = new LinearRegression() .setLabelCol("rings") .setMaxIter(1) .setRegParam(0.3) .setElasticNetParam(0.5) val paramGrid = new ParamGridBuilder() .addGrid(linearRegression.regParam, Array(0.1, 0.3)) .addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9)) .build() val crossvalidator = new CrossValidator() .setEstimator(linearRegression) .setEvaluator(new RegressionEvaluator().setLabelCol("rings")) .setEstimatorParamMaps(paramGrid) .setNumFolds(2) val pipeline = new Pipeline() .setStages(Array(stringIndexer, vectorAsembler, crossvalidator)) // fit model val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234) val model = pipeline.fit(training) val predictions = model.transform(test) // visualize the result predictions.registerTempTable("predictions") predictions.stat.corr("rings", "prediction") ||< * まとめ