読者です 読者をやめる 読者になる 読者になる

About connecting the dots.

statistics/machine learning adversaria.

Spark on EMRでZeppelinを使ってML Pipelineを試してみる

Sparkの最新状況をアップデートする意味も含めて,EMRで一通りの挙動を試してみたので,備忘録的にまとめておきます.慣れると簡単で便利なんですけど,それまでは結構ハマりどころが多いんですよねぇ,このあたり.

Zeppelinにアクセスするまで

AWS Big Dataブログにまとまっている通り,EMRに便利コンポーネントをいろいろ含めて起動するのは簡単です.AWSが用意しているスクリプトをbootstrapに指定して,必要なコンポーネントをオプションで引き渡してあげれば,RとかPythonとかの必須パッケージを含んだ形でEMRを起動できます*1

ちなみに,EMRの起動自体は cli でも実施できます.軽くサンプルを作ってみましたがブートストラップアクションとかインストールコンポーネントとかは適当に変えられますので変えてみてください*2

起動が終わったら,EMRクラスタGUIにアクセスしたいので,SSHトンネルを掘ります.この辺りがEMRの不便なところではあるんですよね... 手順についてはクラスメソッドさんの記事によくまとまっているかと思います.コマンド的には以下のような感じで.

ssh -i ~/.ssh/XXXXXXXX.pem -N -D 8157 hadoop@ec2-XXXXXXXX.compute-1.amazonaws.com

dev.classmethod.jp

ここまで終わったら,ようやっとZeppelinにアクセスできるようになります.こんな感じ.

f:id:SAM:20170122114710p:plain

Redshiftにデータセットをロード

今回使ったデータセットこちらUCIのMachine Learning DatasetからAbaloneを使わせてもらいました.これは4177行x8列のデータセットで,アワビの大きさや重さから,年齢を当てるという,回帰分析用のデータセットになります.CSVから読み込んであげてもいいんですけれども,せっかくなので今回はRedshiftからデータを読み込んでみます.こちらもAWS Big Dataブログにやり方がまとまっています.

まずはS3にアップロードしますけど,これはcliでちょちょっとやればいいだけです.XXXXXXXXになってるところはバケットネームなので,自身のものを入れてください.

aws s3 cp ~/Downloads/abalone.data.txt s3://XXXXXXXX/abalone.csv

続いて,Redshift側ではDDLでテーブル作ってあげて,COPYコマンドでロードします.コマンドは以下の通り*3

create table abalone (sex varchar(2), length real, diameter real, height real, whole_weight real, shucked_weight real, viscera_weight real, shell_weight real, rings integer);
copy abalone from 's3://XXXXXXXX/abalone.csv' credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXX';

ZeppelinからRedshiftにアクセス

ZeppelinからRedshiftにアクセスするためには,いくつかjarをパスに追加してあげる必要があります.このあたりの手順は公式ドキュメントにまとまっているので,参考にして順に以下のjarを追加していきます.

  • /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
  • minimal-json-0.9.4.jar
  • spark-redshift_2.10-2.0.0.jar
  • spark-avro_2.11-3.0.0.jar

一番上のJDBCは最初からクラスタにあるので,そのままパスを追加してあげればOKです.2番目のjsonは,Maven Centralのレポジトリを追加してあげればOK.ですが3,4番目のjarはMaven Centralにあるはずなのですがなぜか追加してもジョブ実行時にエラーが出てしまいます.仕方がないので,直接jarをダウンロードして適当な場所に配置してあげます.最終的に以下のようにDependenciesが設定できました.

f:id:SAM:20170122120334p:plain

ML Pipelineの実行

ML Pipelineはパイプラインのようにパラメタ変換処理をつなげて,機械学習モデルの処理を記述するものです.ML Pipelineには大きく分けてTransformerとEstimatorの2つのコンポーネントがあります.それぞれの引数と戻り値の型は以下の通り.

  • val df2 = new Transformer().transform(df1)
  • val model = new Estimator().fit(df2)

Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.

MLモデルの例

では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)
  .setElasticNetParam(0.1)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
test.printSchema()

実行すると,以下のように結果が得られます.

predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]
([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0
root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...|       1.0|
+-----+--------------+--------------------+--------------------+----------+

Pipelineを使ったCV

次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.

f:id:SAM:20170123133755p:plain

実行コードはこちら.

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql._

import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;

// Provide the jdbc url for Amazon Redshift
val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX"

// Create and declare an S3 bucket where the temporary files are written
val s3TempDir = "s3://XXXXXXXX/"
val abaloneQuery = """
                    select * from abalone
                   """
// Create a Dataframe to hold the results of the above query
val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift")
                                          .option("url", jdbcURL)
                                          .option("tempdir", s3TempDir)
                                          .option("query", abaloneQuery)
                                          .option("temporary_aws_access_key_id", awsAccessKey)
                                          .option("temporary_aws_secret_access_key", awsSecretKey)
                                          .option("temporary_aws_session_token", token).load()  

// create pipeline
val stringIndexer = new StringIndexer()
    .setInputCol("sex")
    .setOutputCol("sex_index")
val vectorAsembler = new VectorAssembler()
    .setInputCols(Array("sex_index", "length", "diameter", "height",
        "whole_weight", "shucked_weight", "viscera_weight", "shell_weight"))
    .setOutputCol("features")
val linearRegression = new LinearRegression()
    .setLabelCol("rings")
    .setMaxIter(1)
    .setRegParam(0.3)
    .setElasticNetParam(0.5)
val paramGrid = new ParamGridBuilder()
    .addGrid(linearRegression.regParam, Array(0.1, 0.3))
    .addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9))
    .build()
val crossvalidator = new CrossValidator()
  .setEstimator(linearRegression)
  .setEvaluator(new RegressionEvaluator().setLabelCol("rings"))
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)
val pipeline = new Pipeline()
    .setStages(Array(stringIndexer, vectorAsembler, crossvalidator))
    
// fit model
val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234)
val model = pipeline.fit(training)
val predictions = model.transform(test)

// visualize the result
predictions.registerTempTable("predictions")
predictions.stat.corr("rings", "prediction")
||< 

* まとめ

*1:今回の実施環境は,EMR Release 5.2.1 で Spark version は 2.0.2でした.

*2:今回のお試しプロセスは,マネジメントコンソールから作っちゃったので,gistサンプルとは構成が異なります.

*3:クレデンシャルとかS3バケットとかは,全部XXXXXXXXでマスクしてあるので,自分の環境に合わせて適宜入れ替えてください