Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.
MLモデルの例
では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
val lr = new LogisticRegression()
lr.setMaxIter(10)
.setRegParam(0.01)
.setElasticNetParam(0.1)
val model1 = lr.fit(training)
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
test.printSchema()
実行すると,以下のように結果が得られます.
predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]
([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0
root
|-- label: double (nullable = false)
|-- features: vector (nullable = true)
|-- rawPrediction: vector (nullable = true)
|-- probability: vector (nullable = true)
|-- prediction: double (nullable = true)
+-----+--------------+--------------------+--------------------+----------+
|label| features| rawPrediction| probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
| 1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...| 1.0|
| 0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...| 0.0|
| 1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...| 1.0|
+-----+--------------+--------------------+--------------------+----------+
Pipelineを使ったCV
次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.

実行コードはこちら.
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql._
import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;
val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX"
val s3TempDir = "s3://XXXXXXXX/"
val abaloneQuery = """
select * from abalone
"""
val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift")
.option("url", jdbcURL)
.option("tempdir", s3TempDir)
.option("query", abaloneQuery)
.option("temporary_aws_access_key_id", awsAccessKey)
.option("temporary_aws_secret_access_key", awsSecretKey)
.option("temporary_aws_session_token", token).load()
val stringIndexer = new StringIndexer()
.setInputCol("sex")
.setOutputCol("sex_index")
val vectorAsembler = new VectorAssembler()
.setInputCols(Array("sex_index", "length", "diameter", "height",
"whole_weight", "shucked_weight", "viscera_weight", "shell_weight"))
.setOutputCol("features")
val linearRegression = new LinearRegression()
.setLabelCol("rings")
.setMaxIter(1)
.setRegParam(0.3)
.setElasticNetParam(0.5)
val paramGrid = new ParamGridBuilder()
.addGrid(linearRegression.regParam, Array(0.1, 0.3))
.addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9))
.build()
val crossvalidator = new CrossValidator()
.setEstimator(linearRegression)
.setEvaluator(new RegressionEvaluator().setLabelCol("rings"))
.setEstimatorParamMaps(paramGrid)
.setNumFolds(2)
val pipeline = new Pipeline()
.setStages(Array(stringIndexer, vectorAsembler, crossvalidator))
val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234)
val model = pipeline.fit(training)
val predictions = model.transform(test)
predictions.registerTempTable("predictions")
predictions.stat.corr("rings", "prediction")
||<
* まとめ