読者です 読者をやめる 読者になる 読者になる

About connecting the dots.

statistics/machine learning adversaria.

RStudio + sparklyr on EMRでスケーラブル機械学習

前回に引き続き分析環境ネタ第2弾*1.今回はEMRでRStudioを立ててみます.

RStudioの構築

やり方自体は,AWS Big Data Blogにまとまっているので,別に難しくはなかったり.RStudioとか関連コンポーネントは,例のごとくS3に便利スクリプトがあるので,これをブートストラップで読み込んであげればOK.

s3://aws-bigdata-blog/artifacts/aws-blog-emr-rstudio-sparklyr/rstudio_sparklyr_emr5.sh --sparklyr --rstudio --sparkr --rexamples --plyrmr

あとはSSHトンネルを掘って,RStudioのWebページにつないであげればOK.8787ポートでつながります.

sparklyrでSparkと連携

sparklyrは,RStudioの人たちが作っている,RからSparkを扱うためのパッケージになります.ちょうどタイムリーにyamano357さんが,sparklyrの記事をあげていたりしますね*2

qiita.com

基本的にリファレンスページが非常にまとまっているので,それを読むだけで一通りの操作ができますが,とりあえず自分でも試してみましょう.sparklyrは read_from_csv() を使ってS3から直接ファイルを読み込むことができます.便利ですね.試してはいないのですが,以下のパッケージを併せて使えば,RでRedshiftからもSparkにデータを読み出せるっぽいです.

github.com

あとは,リファレンスに従って,定番処理を試してみましょう.読み込むデータは前回と一緒のabaloneです.

library(sparklyr)
library(dplyr)

# create Spark Context
sc <- spark_connect(master = "yarn-client", version = "2.0.2")

# create dataset in Spark
abalone <- spark_read_csv(sc, "abalone", path="s3://XXXXXXXX/abalone.csv")
names(abalone) = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height", "rings")
head(abalone)
tbl_cache(sc, "abalone")

# execute query
abalone %>%
  dplyr::group_by(sex) %>%
  dplyr::count()

# separate dataset
partitions <- abalone %>%
  sdf_partition(training = 0.9, test = 0.1, seed = 123)

# execute regression model
fit <- partitions$training %>%
  ml_linear_regression(
    response = "rings",
    features = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height")
  )
summary(fit)
original_value <- partitions$test %>%
    dplyr::select(rings)
predicted_value <- predict(fit, partitions$test)

ということで,慣れると非常に簡単に処理できますね.すばらしい.Sparklyrはチートシートもあるので,慣れるのも割とすぐな感じです.これをみる感じ,ビジュアライズはさすがにRのメモリ側にデータを持ってこないとダメみたいですね.あとは,ML Pipelineの各Transformerに対応するメソッドが揃っていたり,かなり力入れて開発されてるなぁ感がすごいですね.

所感

と,ささっと触って思ったのは,同じことやるならZeppelinで直接DataFrameいじったほうが早い気がするなぁということでした.結局sparklyrでできるのは,RのインタフェースでML Pipelineを扱えるというだけのことで,ロジック自体は実質的にML Pipelineを知らないと書けないし,結果の可視化もZeppelinで十分に対応可能です.

その上で,じゃあどういう使い方をすると良いかなぁと考えると,やっぱりRを使うメリットは高速なプロトタイピングってところに尽きるのかなぁと思ったり.つまり,大規模なデータを前処理して,Rの機械学習モデルに落とし込んでいろいろ試行錯誤する,という使い方かなぁと.なので,有賀さんが前にibisの話ししてたのと同じイメージで使うと,色々捗りそうな気がしています.

www.slideshare.net

2017/1/31追記

sparklyr0.5に合わせて,公式ページに Deploying sparklyr という形で基本的なセットアップと使い方がまとめられてます.そのサンプル読んでると,実は ml_linear_regression() とか x ~ y のR式の記述ができたということに気がつきました.便利だ...

*1:第3弾があるかは謎

*2:Athenaが東京リージョンにくるの待たなくても,オレゴンで立ち上げて,東京リージョンのS3バケットにクエリしちゃえばいいのに,とか思ったりしますが

Spark on EMRでZeppelinを使ってML Pipelineを試してみる

Sparkの最新状況をアップデートする意味も含めて,EMRで一通りの挙動を試してみたので,備忘録的にまとめておきます.慣れると簡単で便利なんですけど,それまでは結構ハマりどころが多いんですよねぇ,このあたり.

Zeppelinにアクセスするまで

AWS Big Dataブログにまとまっている通り,EMRに便利コンポーネントをいろいろ含めて起動するのは簡単です.AWSが用意しているスクリプトをbootstrapに指定して,必要なコンポーネントをオプションで引き渡してあげれば,RとかPythonとかの必須パッケージを含んだ形でEMRを起動できます*1

ちなみに,EMRの起動自体は cli でも実施できます.軽くサンプルを作ってみましたがブートストラップアクションとかインストールコンポーネントとかは適当に変えられますので変えてみてください*2

起動が終わったら,EMRクラスタGUIにアクセスしたいので,SSHトンネルを掘ります.この辺りがEMRの不便なところではあるんですよね... 手順についてはクラスメソッドさんの記事によくまとまっているかと思います.コマンド的には以下のような感じで.

ssh -i ~/.ssh/XXXXXXXX.pem -N -D 8157 hadoop@ec2-XXXXXXXX.compute-1.amazonaws.com

dev.classmethod.jp

ここまで終わったら,ようやっとZeppelinにアクセスできるようになります.こんな感じ.

f:id:SAM:20170122114710p:plain

Redshiftにデータセットをロード

今回使ったデータセットこちらUCIのMachine Learning DatasetからAbaloneを使わせてもらいました.これは4177行x8列のデータセットで,アワビの大きさや重さから,年齢を当てるという,回帰分析用のデータセットになります.CSVから読み込んであげてもいいんですけれども,せっかくなので今回はRedshiftからデータを読み込んでみます.こちらもAWS Big Dataブログにやり方がまとまっています.

まずはS3にアップロードしますけど,これはcliでちょちょっとやればいいだけです.XXXXXXXXになってるところはバケットネームなので,自身のものを入れてください.

aws s3 cp ~/Downloads/abalone.data.txt s3://XXXXXXXX/abalone.csv

続いて,Redshift側ではDDLでテーブル作ってあげて,COPYコマンドでロードします.コマンドは以下の通り*3

create table abalone (sex varchar(2), length real, diameter real, height real, whole_weight real, shucked_weight real, viscera_weight real, shell_weight real, rings integer);
copy abalone from 's3://XXXXXXXX/abalone.csv' credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXX';

ZeppelinからRedshiftにアクセス

ZeppelinからRedshiftにアクセスするためには,いくつかjarをパスに追加してあげる必要があります.このあたりの手順は公式ドキュメントにまとまっているので,参考にして順に以下のjarを追加していきます.

  • /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
  • minimal-json-0.9.4.jar
  • spark-redshift_2.10-2.0.0.jar
  • spark-avro_2.11-3.0.0.jar

一番上のJDBCは最初からクラスタにあるので,そのままパスを追加してあげればOKです.2番目のjsonは,Maven Centralのレポジトリを追加してあげればOK.ですが3,4番目のjarはMaven Centralにあるはずなのですがなぜか追加してもジョブ実行時にエラーが出てしまいます.仕方がないので,直接jarをダウンロードして適当な場所に配置してあげます.最終的に以下のようにDependenciesが設定できました.

f:id:SAM:20170122120334p:plain

ML Pipelineの実行

ML Pipelineはパイプラインのようにパラメタ変換処理をつなげて,機械学習モデルの処理を記述するものです.ML Pipelineには大きく分けてTransformerとEstimatorの2つのコンポーネントがあります.それぞれの引数と戻り値の型は以下の通り.

  • val df2 = new Transformer().transform(df1)
  • val model = new Estimator().fit(df2)

Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.

MLモデルの例

では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)
  .setElasticNetParam(0.1)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
test.printSchema()

実行すると,以下のように結果が得られます.

predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]
([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0
root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...|       1.0|
+-----+--------------+--------------------+--------------------+----------+

Pipelineを使ったCV

次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.

f:id:SAM:20170123133755p:plain

実行コードはこちら.

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql._

import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;

// Provide the jdbc url for Amazon Redshift
val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX"

// Create and declare an S3 bucket where the temporary files are written
val s3TempDir = "s3://XXXXXXXX/"
val abaloneQuery = """
                    select * from abalone
                   """
// Create a Dataframe to hold the results of the above query
val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift")
                                          .option("url", jdbcURL)
                                          .option("tempdir", s3TempDir)
                                          .option("query", abaloneQuery)
                                          .option("temporary_aws_access_key_id", awsAccessKey)
                                          .option("temporary_aws_secret_access_key", awsSecretKey)
                                          .option("temporary_aws_session_token", token).load()  

// create pipeline
val stringIndexer = new StringIndexer()
    .setInputCol("sex")
    .setOutputCol("sex_index")
val vectorAsembler = new VectorAssembler()
    .setInputCols(Array("sex_index", "length", "diameter", "height",
        "whole_weight", "shucked_weight", "viscera_weight", "shell_weight"))
    .setOutputCol("features")
val linearRegression = new LinearRegression()
    .setLabelCol("rings")
    .setMaxIter(1)
    .setRegParam(0.3)
    .setElasticNetParam(0.5)
val paramGrid = new ParamGridBuilder()
    .addGrid(linearRegression.regParam, Array(0.1, 0.3))
    .addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9))
    .build()
val crossvalidator = new CrossValidator()
  .setEstimator(linearRegression)
  .setEvaluator(new RegressionEvaluator().setLabelCol("rings"))
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)
val pipeline = new Pipeline()
    .setStages(Array(stringIndexer, vectorAsembler, crossvalidator))
    
// fit model
val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234)
val model = pipeline.fit(training)
val predictions = model.transform(test)

// visualize the result
predictions.registerTempTable("predictions")
predictions.stat.corr("rings", "prediction")
||< 

* まとめ

*1:今回の実施環境は,EMR Release 5.2.1 で Spark version は 2.0.2でした.

*2:今回のお試しプロセスは,マネジメントコンソールから作っちゃったので,gistサンプルとは構成が異なります.

*3:クレデンシャルとかS3バケットとかは,全部XXXXXXXXでマスクしてあるので,自分の環境に合わせて適宜入れ替えてください

Spark2.0でジョブのアウトプットを高速にS3に書き出す

ここのところEMRでSparkを触ってます.まぁやってるのは,主にデータのparquet+snappyへの変換処理なんですけどね.EMRといえばHDFSではなく,EMRFS経由でS3に書き出すのがモダンなやり方,ということでそれを試してます.で,いろいろ試してて以下の2点の問題に気づきました.

  • S3に書き出す処理が遅い
  • 謎の _$folder$ というファイルができてしまう

今回はこれについて調べたことと,現状の対策法についてまとめておきます.検証環境はEMR Release 5.2.0で,Sparkバージョンは2.0.2になります.クラスタはマスターがm3.xlargeでスレーブがr3.2xlarge x 5台でした.

S3に書き出す処理が遅い

ジョブの挙動をみてると,Sparkジョブが終わっているはずなのに,結果ファイルがすべてでてきていない,という現象が起きていました.調べてみると,どうもデフォルト設定では,一旦出力をテンポラリファイルに書き出してから,最終的な出力先ディレクトリに再配置する,という挙動を取っているようです.これを回避するために,Spark2.0以前のバージョンでは,"spark.sql.parquet.output.committer.class" に "org.apache.spark.sql.parquet.DirectParquetOutputCommitter" を使用することで,書き出しを高速化することができました.このあたりについては,以下のエントリに詳しくまとまっています.

dev.sortable.com

ここでSpark2.0以前と書いたのには理由があって,2.0でこの DirectParquetOutputCommitter は削除されてしまったのです.理由は
SPARK-10063 に書かれていますが,要するにDirectParquetOutputCommitter は結果の整合性チェックをバイパスすることで高速な書き出しを行なっており,結果としてアウトプットが一部ロストする可能性がある,ということのようです.じゃあ対策はないのかというとそんなことはなくて,以下のドンピシャstackoverflowエントリをみつけました.

stackoverflow.com

この OutputCommitter のアルゴリズムってなんぞやというのは,以下の鯵坂さんの説明を読んでください.EMR5.2.0はHadoop2.7系なので,バージョンの2が選択可能というわけです.この記事だとリトライの際のロスが少なくなる,ということみたいですけど,たぶん同時に入ったMAPREDUCE-4815 のほうで,高速化が達成されていることだと思います.

qiita.com

ということで,結論としてはS3に高速に書き出すなら DirectParquetOutputCommitter を2にしましょう.サンプルコードは以下のようになります.Hiveテーブルからデータを読み込んで,parquet+snappyに変換してS3に保存するというやつです.ポイントは14行目,ここでバージョンの指定をしています.手元のデータで試してみたところ,gzip圧縮で10GB程度のファイルを変換するのに,何もしないと15分くらいかかってたのが,この指定で2分くらいになりました.実に7倍近い速度向上です*1

gist.github.com

謎の _$folder$ というファイルができてしまう

実はこっちがもともとのモチベーションだったんですが,parquet に変換して結果をS3に吐き出すと,必ずフォルダの横に,フォルダ名_$folder$ というファイルができてしまうのです.AWSのサイトにも害はないって書いてあるんですけど,普通に邪魔だし嫌じゃないですか.不要なのであれば消したいもんです.実は,これも以下の通り2.0以前であれば DirectParquetOutputCommitter を使えば普通に出力しないようにできたみたいです.

stackoverflow.com

ということで,当然2.0系ではこの方法は使えない,ということになります.そして調べた限り,これの出力を抑制する方法はみつかっていません.誰かご存知であれば,教えてください.

*1:もちろん,手元で適当にやった結果なので,ちゃんとしたベンチマークではない点に留意してください.

書評: StanとRでベイズ統計モデリング

今回は書評エントリです.日本のStan界隈の顔である @berobero11 さんが統計モデリング本を出版し,ありがたいことに献本いただきました*1.ようやっと一通り読み終えた*2ので,感想がてら本の魅力について述べていきたいと思います*3

StanとRでベイズ統計モデリング (Wonderful R)

StanとRでベイズ統計モデリング (Wonderful R)

この本を読んで得られるもの

「統計モデリング」とはどのようなものか,ということについての実践的な知識および心構え

本書には,冒頭で,統計モデリングについて以下のように書かれています.

モデルというのは不必要な性質を大胆に無視して,必要なエッセンスだけを取り上げたものだ....(中略)... 確率モデルをデータにあてはめて現象の理解と予測をうながす営みを統計モデリング (statistical modeling) と呼ぶ....(中略)... すべての確率モデルはあくまでも仮定や仮説であることに注意したい.

まさに,複雑な現実世界を,得られたデータを確率モデルにあてはめることで,モデル化しようという試みこそが統計モデリングなわけです.この本は,実装としてはStanにほぼベッタリになっていますが,本を貫く骨子は,複雑な現実世界をどう記述するか,という姿勢にあると思います.私自身は,統計モデリングにおいて最も大事なのは「自分はこういうモデルで現実世界を表現しよう,切り取ろうと思っているんだよ」,という姿勢なのかな,と個人的に思っていたりします.やりたいことはあくまで複雑な現実世界の理解であって,それを実現するための手段として,統計理論やソフトウェアがあるという位置付けなのかなと.

このあたりは,久保先生の通称みどりほんや,TJOさんのエントリなんかでも述べられており,hoxo_mさんも連載エントリを書かれていますね*4

tjo.hatenablog.com
d.hatena.ne.jp

「統計モデリング」を実現するための手段である「Stan」についての実践的な知識およびリファレンスガイド

Stan自体には,かなりしっかりしたマニュアルがあり,かつその日本語翻訳も行われていたりします.ただ,これらを読むにしても,あまりに膨大なので心が挫ける人も多いのではないでしょうか.これに対して,この本では使いこなすために必要な最低限の知識,および応用的な事項の調べ方(=マニュアルの使い方),について,統計モデリングの実践例をベースにして述べられており,入門書として最適ではないかと思います.

個人的にためになった点

何点かあるのですが,まずは統計モデリングについて,あまり類書ではないような試行錯誤プロセスが載っているのが良いです.まず最初は簡単なモデルで試して,その上で少しずつ仮定を積み上げて精緻なモデルにしていく,といったプロセスが書かれているのは,自分が実際に分析をするときにもこうやれないいのか,というのがイメージしやすいです.

また Chapter 7 の見出しが以下のようになっているのですが,こうした良い統計モデルを作るための実践的な工夫,対処法がしっかり述べられているのって,これも他の本であまりみない(かつとても有用性が高い)です. それも単純に情報量基準の高いモデルを選びましょう,というのではなくて解釈可能性やデータの背景知識と合致する形でモデルを選択しましょう,というスタンスは全くその通りだと思います*5

Chapter 7 回帰分析の悩みどころ
7.1 交互作用
7.2 対数をとるか否か
7.3 非線形の関係
7.4 多重共線形
7.5 交絡
7.6 説明変数が多すぎる
7.7 説明変数にノイズを含む
7.8 打ち切り
7.9 外れ値

同様に,Chapter 10の収束しないときの対処法,も実用上ためになる点です.私自身も,このチャプターに載っているハマりどころに,まさに引っかかったことがあります.そのときの経緯は以下のエントリに詳しく述べてあります.

smrmkt.hatenablog.jp

あとは,Stanで扱えない離散パラメタの対処法について,とても詳しく述べられているのは嬉しいです.そして気がついたら increment_log_prob() はdeprecatedになっていたんですね.それにChapter 12 で述べられている,時間構造と空間構造の等価性については,岩波データサイエンスの vol. 1 の伊庭先生の執筆文を読んでも,完全には腑に落ちきってなかったんですが,事後確率の数式をみたことで疑問が氷解しました*6

そのほか

この本,基本的な想定読者は実務or大学で統計モデリングを日常的に行っている人,もしくはその予備軍の人という感じだと思います.理想的にあると良い知識は,大学教養レベルの線形代数微積分,初等統計学あたりでしょうか.

ただ個人的には,そのような知識がなくてもある程度は読み進められるし,回帰分析が統計パッケージで行える,くらいのレベルであればChapter 1. 3. 5. 7. あたりから得られるものは多いと思います.特に,たとえば心理学や計量社会学を専攻しているような,上記数学知識が十分ではない学生にとってこそ,得るものが大きいように思います.細かい理論がわからないと,統計パッケージをブラックボックスにして,なんでも正規分布を仮定して回帰分析しちゃえばいいや,よくわからないけどパッケージがなんとかしてくれるよ,みたいになりがちです.ですが,そういった姿勢は当然,非常によろしくないわけです.少なくともイメージとして何をしているかは理解して,「自分はこういう仮説を立てて,それを検証するためにこのような統計モデルで現実世界のエッセンスを抜き出しているんだ」というスタンスを持って欲しいなぁと思います.

*1:この場を借りて厚く御礼申し上げます.

*2:Stanでベーシックなモデリング経験のある,統計中級者(Ph. D. クラスのガチ勢ではないというくらいのニュアンスです)の自分で,通読に要した時間が10時間くらいでした.

*3:献本はしてもらいましたが,出版社や著者の回し者ではなく,過剰なPRをしているわけでもありません.あくまで自身の感想を述べております.

*4:そして,ここを掘り下げていくといろいろ宗教論争になりそうなので,このくらいにしておきます.

*5:個人的には,ここが機械学習と統計モデリングの典型的なスタンスの違いなのかなぁと思ったりもします.

*6:時系列の1階差分は,式的には過去から未来にのみ影響がある有向モデルにみえるけど,実際にモデルを推定することを考えると,現在がわかっていれば必然的に過去はその値に影響されるという,双方向の影響が起こるものなんですね.

Fitbitから取得した心拍データで時系列の異常検知を試してみる

井出先生の「異常検知と変化検知」を読んで,自分でも試してみたいと思ったんですが,あいにくちょうどいい時系列データが手元にないなーと思ってました.そんな折,データサイエンスLT祭りの発表の中に,Fitbitデータを可視化するものがあって*1,これはちょうどいいということで試してみましたよというていのエントリになります.

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

Fitbitってなによ

Fitbitが何かしらない人のために一応説明しておくと,最近はやりの活動量計です.私が持っているのは,心拍が取得できるタイプのやつです.風呂に入るとき以外は一日中つけっぱなしで,睡眠とか運動とかを自動で判定してくれるので,手間がかからず便利です.自転車に乗っている時間もちゃんと判定してくれるのは,判定アルゴリズムよくできているなーという感じですね*2.ちなみに運動や睡眠の判定は,データをスマホに転送するタイミングで一気に行われるので,このときに時系列の状態推定アルゴリズムを走らせてるんだろうなぁと思います.その一方で歩数に関しては,手を振ったりしてもほとんどカウントされず,ちゃんと歩いている場合にのみ歩数が増えるところをみると,バンドパスフィルタでも使って地面に足が着地する際のパルス応答的な変化を取ってるんじゃないかなーと推測します*3

Fitbitのデータを取得する

もちろん,Fitbitの各種センサから得られる生データを取得することはできません.ですがFitbitは開発者向けのAPIを提供しており,ここからデータを取得できるようです.また,もっとベタにWebサイトをスクレイピングしちゃうやり方もあるみたいですね.この辺りをサポートしてくれるパッケージをざっと調べたところ,Rなら fitbitScraperPythonなら python-fitbit あたりでしょうか.あとはRなら {httr} パッケージとか使って,OAuthからのAPI直叩きという方法もありそうです.

今回は,普通に {fitbitScraper} を使って,心拍データを取得することにしました.得られるデータは5分間隔のため,1時間で12点,1日で288点になります.以下のようなコードで,適当な日付の心拍データ変動を取得してグラフ化してみました.(fitbitScraper) だけでなく,あとで2軸グラフを記述するために {devtools} 経由で {plotflow} パッケージも入れておきます.(fitbitScraper) は www.fitbit.com へのログインメアドとパスワードが必要なので,持っていない人は取得しておいてください.

install.packages("fitbitScraper")
devtools::install_github("trinker/plotflow")
library(fitbitScraper)
library(plotflow)
library(ggplot2)
library(dplyr)

# Fitbitからのデータ取得
cookie = login(email="www.fitbit.com ログイン用メールアドレス", password="www.fitbit.com ログイン用パスワード")
hr_data <- get_intraday_data(cookie, what="heart-rate", date="2016-10-06")
names(hr_data) <- c("time", "heart_rate") # 変数名にハイフンが入っているのでリネーム
hr_data <- hr_data %>% filter(heart_rate > 0) # Fitbitを取り外しているときは,値が0になるため取り除く

# プロット
hr_plot <- ggplot(data=hr_data, aes(x=time, y=heart_rate)) +
  geom_line(size=0.8) +
  ylim(min=0, max=120) +
  labs(x="Time", y="Heart rate") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
plot(hr_plot)

明け方の寝ている時間は心拍数が50弱/分で推移しており,起きてから犬の散歩にいったので100くらいまで上がっています.そのあとは通常の動きで,夕方前くらいにサイクリングにいったため,やはり100をコンスタントに超えています.そして24時に近づくにつれ心拍が落ち着いていくのが見て取れます.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201225_original.png

こんな感じで,Fitbitのデータは割と簡単に取得できます.

特異スペクトル変換法による時系列データの異常検知

今回使うデータは,時系列の心拍データになります.ここで目的としたいのは,それまでの時間と比べて明らかにおかしな動きがみられたとき,つまり変化点を検知したいということです.井出先生の本にもいくつかの手法が解説されていますが,今回はその中で特異スペクトル変換法を取り上げたいと思います.

この手法は,ざっくりいうとある時点tを起点として,その前のw時点分のデータ(これを部分時系列データと呼びます)を1時点ずつ前にずらしていって,それをk個たばねたもの(仮にX_1と呼びます)と,そこからL時点分後ろにずらして同じくk個たばねたもの(X_2と呼びます)を比較します.両者が似ていれば,t時点で変化は起こっておらず,逆に両者が大きく異なっていたら,t時点付近で時系列データに変化が起こったと考えましょう,ということになります.

このX_1は,1行につきw時点ぶんのデータが並び,それをk行分たばねたk x w行列になります.X_2も同様です.知りたいのはX_1X_2がどれだけ似ているか,ですが,その前段として,X_1X_2 のそれぞれについて,特異値分解で要約し,左特異値ベクトルをm次元分だけ取り出します*4.これにより,時系列の変化を考慮した上で,X_1X_2のそれぞれを要約することができました.

そして得られた2つの特異値について,以下の計算をすることで,最終的な異常値を得ることができます.左辺の第2項は,両者が類似しているほど1に近い値になるので,これを1から引くことで非類似度,つまりこの場合はそれ以前からの変化の度合いを表す値が得られるわけです.

 a(t) = 1 - {\| U_m^{(t)\mathrm{T}} Q_m^{(t)} \|_2}^2

Rで特異スペクトル変換法を実装

さて,それでは実際に計算してみましょう.このあたりの式は,同じく井出先生の「入門機械学習による異常検知」を下敷きにしています.

入門 機械学習による異常検知―Rによる実践ガイド

入門 機械学習による異常検知―Rによる実践ガイド

# 定数
w <- 16 # 部分時系列の要素数
m <- 2 # 類似度算出に使用するSVDの次元数
k <- w / 2 # SVD算出に用いる部分時系列数
L <- k / 2 # 類似度を比較する2つの部分時系列群間のラグ

# 異常値のスコアを算出するメソッド
get_anomaly_score <- function(d, w, m, k, L) {
  anomaly_score <- rep(0, length(d))
  for(t in (w+k):(length(d)-L+1)) {
    start <- t - w - k + 1
    end <- t - 1
    X1 <- t(embed(d[start:end], w))[w:1,] # t以前の部分時系列群
    X2 <- t(embed(d[(start+L):(end+L)], w))[w:1,] # 異常度算出対象の部分時系列群
    
    U1 <- svd(X1)$u[,1:m] # X1にSVDを行い上位m次元を抜き出す
    U2 <- svd(X2)$u[,1:m] # X2にSVDを行い上位m次元を抜き出す
    sig1 <- svd(t(U1) %*% U2)$d[1] # U1とU2の最大特異値を取得
    anomaly_score[t] <- 1 - sig1^2 # 最大特異値の2ノルムを1から引くことで異常値を得る
  }
  anomaly_score
}

# スコアを算出して,心拍の時系列と重ねてプロット
hr <- hr_data$heart_rate
anomaly_score <- get_anomaly_score(hr, w, m, k, L)
as_data <- data.frame(time=hr_data$time, anomaly_score=anomaly_score)
as_plot <- ggplot(as_data, aes(x=time, y=anomaly_score)) +
  geom_line(color="red", size=0.8) +
  labs(title="", x="Time index", y="Anomary score") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
combined_plot <- ggdual_axis(lhs=hr_plot, rhs=as_plot)
plot(combined_plot)

全部で288時点しないので*5,部分時系列の時点数は16くらいに絞っておきます.結果は以下の通りです.おおむね想定したとおりに,変化点が検知できているように思います,ただ後半のサイクリングの変化点が,微妙に後ろにずれちゃっているようにも思います.このあたり,細かいスパイクが悪い影響与えているかもしれないので,スムージングしてからスコアを出したほうがいいのかなぁとも思います.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201224_original.png

パラメタのチューニング

上の例では,すべての定数を決めうちにしちゃっているので,いくつかパラメタを変えてどうスコアが変わるかを見ていきたいと思います.ここではwとmについて試してみます.

まずはwからみていきましょう.wを8, 16, 32の3パターンに変えて,スコアを出してみたのが下の図になります.青が8,赤が16,緑が32になります.みるとわかるように,wが増えるほどスコアが減少して,変化を検出しづらくなっています.部分時系列が増えるんだから,当然そうなりますよね.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201223_original.png

as_data_w08 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 8, m, k, L))
as_data_w16 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 16, m, k, L))
as_data_w32 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 32, m, k, L))
as_plot_w <- ggplot(as_data_w08, aes(x=time, y=anomaly_score)) +
  geom_line(color="blue", size=0.8) +
  geom_line(data=as_data_w16, aes(x=time, y=anomaly_score), color="red", size=0.8) +
  geom_line(data=as_data_w32, aes(x=time, y=anomaly_score), color="green", size=0.8) +
  labs(title="", x="Time index", y="Anomary score") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
combined_plot_w <- ggdual_axis(lhs=hr_plot, rhs=as_plot_w)
plot(combined_plot_w)

同様に,mについても1, 2, 3と変えてみてみたのが,以下の図になります.青が1,赤が2,緑が3になります.こちらも同様に,次元数が少ないほど結果がピーキーになってますね.このあたり,元の時系列データの特性に合わせて,適切な次元数を決めてあげる必要がありそうです.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201222_original.png

そんなわけで,ライフログデータでいろいろ試してみることができますね.面白い.コードはgistにあげてあるので,よろしければどうぞ.

*1:@millionsmileさんの「チャップが山粕を一撃する方法」という発表なんですが,特に資料は公開されてないみたいですね...

*2:とはいえ,感覚的には15分以上連続で運転するぐらいでないと,サイクリングと判定をしてくれないので,誤判定を防ぐためなんでしょうけど,割と保守的だなーとは思います.

*3:そもそも歩数カウントはオンラインでちゃんとカウントされていくため,時間&空間計算量をそこそこ必要とする時系列の状態推定は厳しそうに思います.なので単純なフィルタリングをしてるだけなんじゃないかなー

*4:ここで特異値分解で得られる左特異値ベクトルのL2ノルムをとると1になります

*5:この日のデータは欠損があるっぽくて,286時点しかありませんでしたが.

Ansible経由でDockerイメージを作ってみる

これまで仮想化とかクラウドとか,そんなにお仕事で触ってなかったこともあって割と放置気味だったのだけど,さすがに少しは使えないとねということでちょいと試してみましたというお話.

以前に依頼を受けてWebアプリを作ったことがあって,これを1年くらい前にAnsible化してました.で,これをDockerに乗せてElastic BeanstalkにDokcerコンテナとしてデプロイしようと思ったわけです.

DockerfileとAnsible playbook

最初はplaybookの内容を,全部Dokcerfileに書きかえようと思ったんですけど,これが思った以上に面倒くさい.ていうかDockerfileってAnsibleみたいな丁寧なセットアッププロセスや便利なラッパーがなくて,割とベタに RUN コマンドでスクリプトそのまま書かなきゃいけないの,かなり嫌です.このあたりって,他の人たちはどうやって解決しているんでしょう.細かい粒度でプロセス切り分けて,Dockerfileが肥大化しないようにしているから,そんなに問題にならないってことなんですかね.Ansibleのほうがラッパーなぶん手軽だし,依存関係とかも

ということで,前の資産を生かす形で,Ansible playbookを拡張することにしました.手順としては,まずDockerコンテナを立ちあげるタスクを追加します.そして立ち上げたコンテナに従来のplaybookを流して,あとはイメージを保存,そしてElastic Beanstalkにコンテナをデプロイ,です.

Ansible playbookのタスク追加

sites.yml を以下のように記述します.docker_host に対するタスクとして,docker タスクを追加しています.

---
- hosts: docker_host
  become: yes
  remote_user: docker
  tasks:
    - name: deploy centos container
      docker:
        image: centos:centos6
        name: iroha
        ports: 80:80
        expose: 80
        tty: yes

- hosts: container
  connection: docker
  roles:
    - nginx
    - php
    - iroha
    - mysql

このあたり,Dokcerとの連携については以下のエントリに詳しいです*1

dev.classmethod.jp

Docker machineのセットアップ

Dokcer for Macが出たので,これをインストールしておきましょう*2.いくつかセットアップしておかなければいけないことがありますが,このあたりは以下のエントリをみて適当にやってください.docker-machineでちゃんとホストVMをセットアップしておかないと,ちゃんとDockerコンテナを立てることはできません.

qiita.com

playbookの実行とイメージの保存

ここまで準備できたら,あとはplaybookを流せば,Webアプリの載ったDockerコンテナが立ち上がります.

ansible-playbook -i hosts site.yml

docker psで実際にコンテナができてるか確認して,imageを作成します.

$ docker ps
CONTAINER ID        IMAGE                          COMMAND                  CREATED             STATUS              PORTS                              NAMES
3985cf72cf58        centos:centos6                 "/bin/bash"              13 days ago         Up 13 days          0.0.0.0:80->80/tcp                 iroha
b6ca8a876c82        jupyter/datascience-notebook   "tini -- start-notebo"   13 days ago         Up 13 days          0.0.0.0:8888->8888/tcp             compassionate_joliot
3879d9ab028a        tokyor/rstudio                 "/init"                  2 weeks ago         Up 2 weeks          1410/tcp, 0.0.0.0:8787->8787/tcp   awesome_noether
$ docker commit -m"initial commit for irohajirui-sho container" 3985cf72cf58 iroha:latest
sha256:1455b4022f1977361d78d96a23aaca3a0c287d0f6a6ae14bc7c5607d04945e4a

このあとは

できあがったイメージをDocker hubにでもあげて,Elastic Beanstalkあたりでデプロイすれば,すらばしい継続的デリバリー体制の完成,めでたしめでたし.というところなんですが,現状まだElastic Beanstalkでちゃんと動作していない次第.うまくいったら別エントリにでもしますかね... ほんとうはMySQLをRDSに分離して,アプリケーションはDockerfileに切り出して,DockerfileでのElastic Beanstalkへのデプロイにしたほうが,より洗練されている感はあります.

dev.classmethod.jp

*1:だいたいこのエントリみながら試してみた次第です.

*2:WindowsもDocker for Winが出てるので,同じようにできるはずだと思います.

Fundamental Models for Forecasting Elections から考える,日本の選挙における当選者予測方法

参議院選挙が終わったと思ったら今度は都知事選と,まさに選挙の夏ですね.選挙といえば,20時の投票〆切と同時に発表される,マスメディア各社の当選者予測が風物詩です.開票率0%で当確が続々打たれる様は,まさに統計学+社会調査の面目躍如という感じがしますね.各社の当選者予測については,詳細は必ずしも明らかにされていないものの,大枠については様々なところで述べられています.読みやすい記事としては,以下の記事なんかがよくまとまっているのではないでしょうか.

go2senkyo.com

基本としては,RDDベースの社会調査をもとに,過去データに基づいて値の補正や当選確率予測モデルの当てはめを行うことで,高い精度の当確予測を行っている訳ですね.各社は定期的に政党支持率等の調査を行うとともに,選挙期間は頻度を上げて調査を行います.その中で,明確な差があると判断され,かつ出口調査でも傾向が変わらなければ,基本的には20時の時点で当確を打つことが可能になります.こうした選挙調査の手法については,日経リサーチの鈴木さんが書いた「選挙における調査と予測報道」や,朝日新聞社の松田さんが書いた「調査手法転換時の対応と判断 : 2000年総選挙と2001年参院選挙の事例(選挙とOR)」,さらにNHKの仁平さんの「選挙と出口調査」あたりにまとまっています.いずれも結構細かく述べられていて,興味深いです.

さて,こうした社会調査ベースの手法も興味深くはあるのですが,それ以外に当選者予測を行う方法はないのでしょうか.とりあえずCiNiiで日本の論文を探した限り,ほとんど存在しないというのが実情のようです*1 *2.唯一に近く引っかかったのは,「Twitterにおける候補者の情報拡散に着目した国政選挙当選者予測」くらいですが,流石にこれは高い精度を求めた手法という訳でもないので,参考にはならなさそうです.

ということで,もう少し広く探してみたところ,以下の論文が引っかかりました.ということで今回はこの論文の紹介をしたいと思います.

Fundamental Models for Forecasting Elections

論文概要

この論文,正確にはアメリカ経済学会のカンファレンスペーパーとして2013年に発表されているものになります*3.著者はGoogleMicrosoft Research勤務の経済学者で,基本的には計量経済学の手法が使われています*4.手法としては,アメリカの大統領選,上院選,州知事選などに対して,3ヶ月以上前に手に入るようなマクロデータ等を用いてprobit回帰を行っています.

この論文内の先行研究レビューでは,主に選挙直近に手に入るデータを用いた計量分析や,予測市場に基づいた予測の手法等があげられています.これら手法と比べて,選挙の3ヶ月以上前に当選者を予測することができる,州ごとの予測を既存研究よりも高精度で予測することができる,というのがこの論文の主張になります.

手法

予測する変数は,候補者ごとの得票率と勝利確率の2つになります.これらを,過去の選挙データや経済・政治に関するマクロデータを用いて予測します.得票率については線形回帰,勝率予測はprobit回帰を用いています.各選挙について,州ごとに結果を予測することで,選挙回数に比してサンプル数を増やすことが可能になり,かつ各州の個別要因を考慮することが可能になります.

またこのモデルの特徴は,基本的に民主党と共和党のみを重視したものになっていることです.アメリカの場合は,2大政党制が長く続いており,この形でモデルにしてしまうのが妥当といえます.具体的には,大統領選を予測するモデルの中で,現職大統領が民主党か否かという独立変数を用いていたります(民主党なら1, 共和党なら-1となるダミー変数です).

また,民主党共和党以外の候補については,そのまま独立変数としてモデルに組み込むような荒技を使っています.例えば1968年の大統領選では,アメリカ独立党のジョージ・ウォレス候補ジョージア州アラバマ州等で勝利を収めていますが,これを「Wallace's vote share in state in 1968 if Southern state and year is 1972」という独立変数として用いています.こうしたことが可能なのも,これらの事象があくまで例外的で,2大政党制が基本的には崩れていないからでしょう.

結果

ここでは,一番精度のよかった大統領選のモデルを示します.線形回帰による得票率予測モデルは,以下の通りです.変数には納得間のあるものがたくさん並んでおり,過去選挙の州ごとの投票率の偏差などは,日本でもそのまま当てはめることができそうにも思います.Wallace, Anderson, Perotあたりは,特別な選挙の影響排除の項目ですね.これらを含んだ結果の調整済決定係数は R^2=0.836 とそれなりに高い値を示しています.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20160718/20160718193540_original.png

また,結果について,横軸に民主党候補者の予測当選確率/得票率を,縦軸に実際の得票率を示したのが下の図です.たしかに,それなりに綺麗に予測できているといえそうですね.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20160718/20160718193539_original.png

同様に,州知事選や上院選についても,同様の結果がありますが,そちらは論文本体を参照してくださいということで省略します.

雑感

割と綺麗にまとめられた論文でした.さて当然ここで気になるのは,この結果が日本の選挙予測に応用可能なのか,という点です.私自身の感想としては,これだけだと結構厳しいなぁというものです.というのは,日本の選挙は政党の統合や新設が非常に多くあり,かつ政党間の細かい駆け引きや個別事情が入り組んでいます.今回の参院選でも,民主党と維新の会が合流して民進党になりましたが,この時点で既存の予測モデルをそのまま当てはめるのが困難になります.もちろん独立変数に頑張ってパラメタを入れ込むことも可能かもしれませんが,この形を取ると独立変数がそれだけで50個とかできちゃうように思います.

また時代とともに結構党の性格やあり方も変わっていくように思え,stableなモデルをどこまで当てはめてよいものか,結構悩ましいようにも感じます.かといって,動的線形モデルみたいなものを考えれば良いかというと,そもそも選挙という事象は数年に1回しか行われない,非常にサンプル数の限られるドメインです.そのため複雑なモデルを設定することは,それ自体が大きな不確実性を生む要因となってしまいます.そのためちまたで流行のディープラーニングなんかも,選挙に関しては使う機会がなさそうだなぁと思っていたりします.

ということで,詰まるところ既存のマスメディア各社の予測スタイルは,それはそれで最適解なのかもしれないなぁと感じる次第です.でもアカデミアの人たちには,もう少し予測モデル頑張ってほしいなぁと思ったり思わなかったり.

*1:「選挙 予測」みたいなキーワードで検索しても,ほとんど何も引っかからない状態です.

*2:なぜまず日本で探したかというと,選挙制度は各国で非常に大きく異なるため,海外の事例がそのまま日本に当てはまるとは考えにくいからです.

*3:2014年に Fundamental models for forecasting elections at the state level としてElectoral Studies誌にまとめられていますが,例のごとくElsevierの雑誌は有料なので,こちらは呼んでいません.

*4:全然関係ないですけど,アメリカのIT企業が経済学者を研究部門に抱えていたりするの,知を大事にしている感じがしてよいなーと個人的には思っています.