About connecting the dots.

data science related trivial things

PySpark で書き出しデータのパーティション数を調節する

小ネタなんですが,なかなかググっても見つからず,あれこれと試行錯誤してしまったので,メモがわりに.

要するに,gzip 圧縮してあるデータを読み出して,年月ごとにデータをパーティション分けして,結果を parquet 形式の 1 ファイルで書き出す,みたいな処理がしたいということです.結局 repartition() を使えばよかったので,以下のように yyyymm カラムを一時的に作って,パーティションを切りなおしてからそのカラムを落とすというテクを使いました.普通なら repartition(int) で直接パーティション数を指定すれば良いんでしょうけど,複数年月が分けられておらず固まったデータを一気に読み込んで,一気にパーティショニングしたいときには,こんな感じで無理やり動かすしかないのかなーという感じです.ちなみに 8-9 行目の処理は,S3 への書き出しを高速化するための設定になります*1

ちなみに repartition() ではなく coalesce() を使うやり方もありますが,こちらの記事によるとファイルサイズが均一にそろわないっぽいです.また今回のとは別で,データサイズをみて適切なファイルサイズごとにパーティションを分けたい,という場合であれば,以下の記事にあるように df.inputFiles.size で取得できるんですね.

stackoverflow.com

from pyspark.context import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import year, month, date_format

in_path = "s3://XXX/gz/*.gz"
out_path = "s3://XXX/parquet/"

sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")
sc._jsc.hadoopConfiguration().set("spark.speculation", "false")
sqlContext = SQLContext(sc)
sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")

originalDf = sqlContext.read \
    .format("com.databricks.spark.csv") \
    .option("delimiter", "\t") \
    .load(in_path)
renamedDf = originalDf \
    .withColumnRenamed("_c0", "prod_id") \
    .withColumnRenamed("_c1", "cust_id") \
    .withColumnRenamed("_c2", "time_id") \
    .withColumnRenamed("_c3", "amount_sold")
yearAddedDf = renamedDf.withColumn("year", year(renamedDf.time_id))
monthAddedDf = yearAddedDf.withColumn("month", month(yearAddedDf.time_id))
yyyymmAddedDf = monthAddedDf.withColumn("yyyymm", date_format(monthAddedDf.time_id, 'yyyyMM'))

repartitionedDf = yyyymmAddedDf.repartition("yyyymm")
dropedDf = repartitionedDf.drop("yyyymm")

castedDf = dropedDf.withColumn("prod_id", dropedDf.prod_id.cast("decimal(38,0)")) \
    .withColumn("cust_id", dropedDf.cust_id.cast("decimal(38,0)")) \
    .withColumn("time_id", dropedDf.time_id.cast("timestamp")) \
    .withColumn("amount_sold", dropedDf.amount_sold.cast("decimal(38,2)")) \
    .withColumn("year", dropedDf.year.cast("int")) \
    .withColumn("month", dropedDf.month.cast("int"))
castedDf.write.partitionBy(["year", "month"]).mode("overwrite").parquet(out_path)

*1:このあたりは,昨年の記事にまとめてあります.

AWS の Deep Learning AMI で MXNet のバージョンをあげる

最近 AWSDeep Learning 周りのモデルを試してみたりしています.Deep Learning AMI があるので,自分で CUDA や cuDNN や各種フレームワークを入れる必要がないというのが一番大きいです*1.10 分もあれば,P2 インスタンス上で Jupyter notebook 使って開発が開始できるのはとても楽です.もうローカルマシンに環境構築する時代には戻りたくないものです.

とはいえ,この界隈は非常に進歩が早くていろいろ追いついていないことが多いです.現在の Deep Learning AMI では MXNet の 0.9.3 がインストールされているのですが,MXNet 自体はすでに 0.10.0 になっており,ドキュメントもそれに合わせてバージョンアップしています.このバージョンアップで結構 API が変わっており*2,ドキュメントみながらいろいろ触ってても,エラーが出て結構詰まってしまうのですね...

なので MXNet のバージョンをあげましたよというのが今回のお話.やることはそんなに多くなくて,コマンドを以下のように順に打っていけばよいです.今後も同じようなことがあったときのための備忘録がわりに*3

cd ~/src/mxnet
git pull origin master
git submodule update
sudo make clean && make # 2h くらいかかります
python -c "import mxnet; print(mxnet.__veriosn__)" # 0.9.3 と表示されるはず
cd python
sudo python setup.py install
python -c "import mxnet; print(mxnet.__veriosn__)" # 0.10.0 と表示されればOK

*1:Amazon Linux だけではなく,Ubuntu の AMI もあります.

*2:mod や test_util あたりが特に変わっている感じがします.

*3:こちらとかこちらを参考にしました.

RStudio + sparklyr on EMRでスケーラブル機械学習

前回に引き続き分析環境ネタ第2弾*1.今回はEMRでRStudioを立ててみます.

RStudioの構築

やり方自体は,AWS Big Data Blogにまとまっているので,別に難しくはなかったり.RStudioとか関連コンポーネントは,例のごとくS3に便利スクリプトがあるので,これをブートストラップで読み込んであげればOK.

s3://aws-bigdata-blog/artifacts/aws-blog-emr-rstudio-sparklyr/rstudio_sparklyr_emr5.sh --sparklyr --rstudio --sparkr --rexamples --plyrmr

あとはSSHトンネルを掘って,RStudioのWebページにつないであげればOK.8787ポートでつながります.

sparklyrでSparkと連携

sparklyrは,RStudioの人たちが作っている,RからSparkを扱うためのパッケージになります.ちょうどタイムリーにyamano357さんが,sparklyrの記事をあげていたりしますね*2

qiita.com

基本的にリファレンスページが非常にまとまっているので,それを読むだけで一通りの操作ができますが,とりあえず自分でも試してみましょう.sparklyrは read_from_csv() を使ってS3から直接ファイルを読み込むことができます.便利ですね.試してはいないのですが,以下のパッケージを併せて使えば,RでRedshiftからもSparkにデータを読み出せるっぽいです.

github.com

あとは,リファレンスに従って,定番処理を試してみましょう.読み込むデータは前回と一緒のabaloneです.

library(sparklyr)
library(dplyr)

# create Spark Context
sc <- spark_connect(master = "yarn-client", version = "2.0.2")

# create dataset in Spark
abalone <- spark_read_csv(sc, "abalone", path="s3://XXXXXXXX/abalone.csv")
names(abalone) = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height", "rings")
head(abalone)
tbl_cache(sc, "abalone")

# execute query
abalone %>%
  dplyr::group_by(sex) %>%
  dplyr::count()

# separate dataset
partitions <- abalone %>%
  sdf_partition(training = 0.9, test = 0.1, seed = 123)

# execute regression model
fit <- partitions$training %>%
  ml_linear_regression(
    response = "rings",
    features = c("sex", "length", "diameter", "height", "whole_height", "shucked_height", "viscera_height", "shell_height")
  )
summary(fit)
original_value <- partitions$test %>%
    dplyr::select(rings)
predicted_value <- predict(fit, partitions$test)

ということで,慣れると非常に簡単に処理できますね.すばらしい.Sparklyrはチートシートもあるので,慣れるのも割とすぐな感じです.これをみる感じ,ビジュアライズはさすがにRのメモリ側にデータを持ってこないとダメみたいですね.あとは,ML Pipelineの各Transformerに対応するメソッドが揃っていたり,かなり力入れて開発されてるなぁ感がすごいですね.

所感

と,ささっと触って思ったのは,同じことやるならZeppelinで直接DataFrameいじったほうが早い気がするなぁということでした.結局sparklyrでできるのは,RのインタフェースでML Pipelineを扱えるというだけのことで,ロジック自体は実質的にML Pipelineを知らないと書けないし,結果の可視化もZeppelinで十分に対応可能です.

その上で,じゃあどういう使い方をすると良いかなぁと考えると,やっぱりRを使うメリットは高速なプロトタイピングってところに尽きるのかなぁと思ったり.つまり,大規模なデータを前処理して,Rの機械学習モデルに落とし込んでいろいろ試行錯誤する,という使い方かなぁと.なので,有賀さんが前にibisの話ししてたのと同じイメージで使うと,色々捗りそうな気がしています.

www.slideshare.net

2017/1/31追記

sparklyr0.5に合わせて,公式ページに Deploying sparklyr という形で基本的なセットアップと使い方がまとめられてます.そのサンプル読んでると,実は ml_linear_regression() とか x ~ y のR式の記述ができたということに気がつきました.便利だ...

*1:第3弾があるかは謎

*2:Athenaが東京リージョンにくるの待たなくても,オレゴンで立ち上げて,東京リージョンのS3バケットにクエリしちゃえばいいのに,とか思ったりしますが

Spark on EMRでZeppelinを使ってML Pipelineを試してみる

Sparkの最新状況をアップデートする意味も含めて,EMRで一通りの挙動を試してみたので,備忘録的にまとめておきます.慣れると簡単で便利なんですけど,それまでは結構ハマりどころが多いんですよねぇ,このあたり.

Zeppelinにアクセスするまで

AWS Big Dataブログにまとまっている通り,EMRに便利コンポーネントをいろいろ含めて起動するのは簡単です.AWSが用意しているスクリプトをbootstrapに指定して,必要なコンポーネントをオプションで引き渡してあげれば,RとかPythonとかの必須パッケージを含んだ形でEMRを起動できます*1

ちなみに,EMRの起動自体は cli でも実施できます.軽くサンプルを作ってみましたがブートストラップアクションとかインストールコンポーネントとかは適当に変えられますので変えてみてください*2

起動が終わったら,EMRクラスタGUIにアクセスしたいので,SSHトンネルを掘ります.この辺りがEMRの不便なところではあるんですよね... 手順についてはクラスメソッドさんの記事によくまとまっているかと思います.コマンド的には以下のような感じで.

ssh -i ~/.ssh/XXXXXXXX.pem -N -D 8157 hadoop@ec2-XXXXXXXX.compute-1.amazonaws.com

dev.classmethod.jp

ここまで終わったら,ようやっとZeppelinにアクセスできるようになります.こんな感じ.

f:id:SAM:20170122114710p:plain

Redshiftにデータセットをロード

今回使ったデータセットこちらUCIのMachine Learning DatasetからAbaloneを使わせてもらいました.これは4177行x8列のデータセットで,アワビの大きさや重さから,年齢を当てるという,回帰分析用のデータセットになります.CSVから読み込んであげてもいいんですけれども,せっかくなので今回はRedshiftからデータを読み込んでみます.こちらもAWS Big Dataブログにやり方がまとまっています.

まずはS3にアップロードしますけど,これはcliでちょちょっとやればいいだけです.XXXXXXXXになってるところはバケットネームなので,自身のものを入れてください.

aws s3 cp ~/Downloads/abalone.data.txt s3://XXXXXXXX/abalone.csv

続いて,Redshift側ではDDLでテーブル作ってあげて,COPYコマンドでロードします.コマンドは以下の通り*3

create table abalone (sex varchar(2), length real, diameter real, height real, whole_weight real, shucked_weight real, viscera_weight real, shell_weight real, rings integer);
copy abalone from 's3://XXXXXXXX/abalone.csv' credentials 'aws_access_key_id=XXXXXXXX;aws_secret_access_key=XXXXXXXX';

ZeppelinからRedshiftにアクセス

ZeppelinからRedshiftにアクセスするためには,いくつかjarをパスに追加してあげる必要があります.このあたりの手順は公式ドキュメントにまとまっているので,参考にして順に以下のjarを追加していきます.

  • /usr/share/aws/redshift/jdbc/RedshiftJDBC41.jar
  • minimal-json-0.9.4.jar
  • spark-redshift_2.10-2.0.0.jar
  • spark-avro_2.11-3.0.0.jar

一番上のJDBCは最初からクラスタにあるので,そのままパスを追加してあげればOKです.2番目のjsonは,Maven Centralのレポジトリを追加してあげればOK.ですが3,4番目のjarはMaven Centralにあるはずなのですがなぜか追加してもジョブ実行時にエラーが出てしまいます.仕方がないので,直接jarをダウンロードして適当な場所に配置してあげます.最終的に以下のようにDependenciesが設定できました.

f:id:SAM:20170122120334p:plain

ML Pipelineの実行

ML Pipelineはパイプラインのようにパラメタ変換処理をつなげて,機械学習モデルの処理を記述するものです.ML Pipelineには大きく分けてTransformerとEstimatorの2つのコンポーネントがあります.それぞれの引数と戻り値の型は以下の通り.

  • val df2 = new Transformer().transform(df1)
  • val model = new Estimator().fit(df2)

Transformerはパラメタの変換処理を行います.扱える処理は結構多く,tokenaizer, TF-IDFなんてのも実施できたりします.Estimatorはいわゆる機械学習モデルです.Estimatorは,fit() メソッドを使って訓練データを学習することで,予測が可能な機械学習modelとなります.そして,このmodelを使って,model.transform(testDf) とすることで,予測結果を得ることができます.もちろんCVなども完備されているため,これら一連の流れをまとめて綺麗に記述することができます.

MLモデルの例

では,以下に実例を書いていきます.まずはPipelineを使わない簡単な例から.

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)
  .setElasticNetParam(0.1)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
test.printSchema()

実行すると,以下のように結果が得られます.

predicted: org.apache.spark.sql.DataFrame = [label: double, features: vector ... 3 more fields]
([-1.0,1.5,1.3], 1.0) -> prob=[0.0011845390472119933,0.9988154609527881], prediction=1.0
([3.0,2.0,-0.1], 0.0) -> prob=[0.9829127480824043,0.017087251917595875], prediction=0.0
([0.0,2.2,-1.5], 1.0) -> prob=[0.0014877175926165148,0.9985122824073834], prediction=1.0
root
 |-- label: double (nullable = false)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
+-----+--------------+--------------------+--------------------+----------+
|label|      features|       rawPrediction|         probability|prediction|
+-----+--------------+--------------------+--------------------+----------+
|  1.0|[-1.0,1.5,1.3]|[-6.7372163285975...|[0.00118453904721...|       1.0|
|  0.0|[3.0,2.0,-0.1]|[4.05218767176669...|[0.98291274808240...|       0.0|
|  1.0|[0.0,2.2,-1.5]|[-6.5090233251486...|[0.00148771759261...|       1.0|
+-----+--------------+--------------------+--------------------+----------+

Pipelineを使ったCV

次はPipelineを使って変数の変換と,CVを含めたモデルを記述します.で,最終的な結果としては,重相関係数0.62という形で結果が得られました.元の値と予測値のプロットをとると以下の通りです.

f:id:SAM:20170123133755p:plain

実行コードはこちら.

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.regression.LinearRegression
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.mllib.stat.Statistics
import org.apache.spark.sql._

import com.amazonaws.auth._
import com.amazonaws.auth.AWSCredentialsProvider
import com.amazonaws.auth.AWSSessionCredentials
import com.amazonaws.auth.InstanceProfileCredentialsProvider
import com.amazonaws.services.redshift.AmazonRedshiftClient
import _root_.com.amazon.redshift.jdbc41.Driver
import org.apache.spark.sql.SaveMode
import org.apache.spark.sql.SQLContext

// Instance Profile for authentication to AWS resources
val provider = new InstanceProfileCredentialsProvider();
val credentials: AWSSessionCredentials = provider.getCredentials.asInstanceOf[AWSSessionCredentials];
val token = credentials.getSessionToken;
val awsAccessKey = credentials.getAWSAccessKeyId;
val awsSecretKey = credentials.getAWSSecretKey

val sqlContext = new SQLContext(sc)
import sqlContext.implicits._;

// Provide the jdbc url for Amazon Redshift
val jdbcURL = "jdbc:redshift://XXXXXXXXXX.us-east-1.redshift.amazonaws.com:5439/mldataset?user=XXXXXX&password=XXXXXXX"

// Create and declare an S3 bucket where the temporary files are written
val s3TempDir = "s3://XXXXXXXX/"
val abaloneQuery = """
                    select * from abalone
                   """
// Create a Dataframe to hold the results of the above query
val abaloneDF = sqlContext.read.format("com.databricks.spark.redshift")
                                          .option("url", jdbcURL)
                                          .option("tempdir", s3TempDir)
                                          .option("query", abaloneQuery)
                                          .option("temporary_aws_access_key_id", awsAccessKey)
                                          .option("temporary_aws_secret_access_key", awsSecretKey)
                                          .option("temporary_aws_session_token", token).load()  

// create pipeline
val stringIndexer = new StringIndexer()
    .setInputCol("sex")
    .setOutputCol("sex_index")
val vectorAsembler = new VectorAssembler()
    .setInputCols(Array("sex_index", "length", "diameter", "height",
        "whole_weight", "shucked_weight", "viscera_weight", "shell_weight"))
    .setOutputCol("features")
val linearRegression = new LinearRegression()
    .setLabelCol("rings")
    .setMaxIter(1)
    .setRegParam(0.3)
    .setElasticNetParam(0.5)
val paramGrid = new ParamGridBuilder()
    .addGrid(linearRegression.regParam, Array(0.1, 0.3))
    .addGrid(linearRegression.elasticNetParam, Array(0.1, 0.3, 0.5, 0.7, 0.9))
    .build()
val crossvalidator = new CrossValidator()
  .setEstimator(linearRegression)
  .setEvaluator(new RegressionEvaluator().setLabelCol("rings"))
  .setEstimatorParamMaps(paramGrid)
  .setNumFolds(2)
val pipeline = new Pipeline()
    .setStages(Array(stringIndexer, vectorAsembler, crossvalidator))
    
// fit model
val Array(training, test) = abaloneDF.randomSplit(Array(0.9, 0.1), seed = 1234)
val model = pipeline.fit(training)
val predictions = model.transform(test)

// visualize the result
predictions.registerTempTable("predictions")
predictions.stat.corr("rings", "prediction")
||< 

* まとめ

*1:今回の実施環境は,EMR Release 5.2.1 で Spark version は 2.0.2でした.

*2:今回のお試しプロセスは,マネジメントコンソールから作っちゃったので,gistサンプルとは構成が異なります.

*3:クレデンシャルとかS3バケットとかは,全部XXXXXXXXでマスクしてあるので,自分の環境に合わせて適宜入れ替えてください

Spark2.0でジョブのアウトプットを高速にS3に書き出す

ここのところEMRでSparkを触ってます.まぁやってるのは,主にデータのparquet+snappyへの変換処理なんですけどね.EMRといえばHDFSではなく,EMRFS経由でS3に書き出すのがモダンなやり方,ということでそれを試してます.で,いろいろ試してて以下の2点の問題に気づきました.

  • S3に書き出す処理が遅い
  • 謎の _$folder$ というファイルができてしまう

今回はこれについて調べたことと,現状の対策法についてまとめておきます.検証環境はEMR Release 5.2.0で,Sparkバージョンは2.0.2になります.クラスタはマスターがm3.xlargeでスレーブがr3.2xlarge x 5台でした.

S3に書き出す処理が遅い

ジョブの挙動をみてると,Sparkジョブが終わっているはずなのに,結果ファイルがすべてでてきていない,という現象が起きていました.調べてみると,どうもデフォルト設定では,一旦出力をテンポラリファイルに書き出してから,最終的な出力先ディレクトリに再配置する,という挙動を取っているようです.これを回避するために,Spark2.0以前のバージョンでは,"spark.sql.parquet.output.committer.class" に "org.apache.spark.sql.parquet.DirectParquetOutputCommitter" を使用することで,書き出しを高速化することができました.このあたりについては,以下のエントリに詳しくまとまっています.

dev.sortable.com

ここでSpark2.0以前と書いたのには理由があって,2.0でこの DirectParquetOutputCommitter は削除されてしまったのです.理由は
SPARK-10063 に書かれていますが,要するにDirectParquetOutputCommitter は結果の整合性チェックをバイパスすることで高速な書き出しを行なっており,結果としてアウトプットが一部ロストする可能性がある,ということのようです.じゃあ対策はないのかというとそんなことはなくて,以下のドンピシャstackoverflowエントリをみつけました.

stackoverflow.com

この OutputCommitter のアルゴリズムってなんぞやというのは,以下の鯵坂さんの説明を読んでください.EMR5.2.0はHadoop2.7系なので,バージョンの2が選択可能というわけです.この記事だとリトライの際のロスが少なくなる,ということみたいですけど,たぶん同時に入ったMAPREDUCE-4815 のほうで,高速化が達成されていることだと思います.

qiita.com

ということで,結論としてはS3に高速に書き出すなら DirectParquetOutputCommitter を2にしましょう.サンプルコードは以下のようになります.Hiveテーブルからデータを読み込んで,parquet+snappyに変換してS3に保存するというやつです.ポイントは14行目,ここでバージョンの指定をしています.手元のデータで試してみたところ,gzip圧縮で10GB程度のファイルを変換するのに,何もしないと15分くらいかかってたのが,この指定で2分くらいになりました.実に7倍近い速度向上です*1

gist.github.com

謎の _$folder$ というファイルができてしまう

実はこっちがもともとのモチベーションだったんですが,parquet に変換して結果をS3に吐き出すと,必ずフォルダの横に,フォルダ名_$folder$ というファイルができてしまうのです.AWSのサイトにも害はないって書いてあるんですけど,普通に邪魔だし嫌じゃないですか.不要なのであれば消したいもんです.実は,これも以下の通り2.0以前であれば DirectParquetOutputCommitter を使えば普通に出力しないようにできたみたいです.

stackoverflow.com

ということで,当然2.0系ではこの方法は使えない,ということになります.そして調べた限り,これの出力を抑制する方法はみつかっていません.誰かご存知であれば,教えてください.

*1:もちろん,手元で適当にやった結果なので,ちゃんとしたベンチマークではない点に留意してください.

書評: StanとRでベイズ統計モデリング

今回は書評エントリです.日本のStan界隈の顔である @berobero11 さんが統計モデリング本を出版し,ありがたいことに献本いただきました*1.ようやっと一通り読み終えた*2ので,感想がてら本の魅力について述べていきたいと思います*3

StanとRでベイズ統計モデリング (Wonderful R)

StanとRでベイズ統計モデリング (Wonderful R)

この本を読んで得られるもの

「統計モデリング」とはどのようなものか,ということについての実践的な知識および心構え

本書には,冒頭で,統計モデリングについて以下のように書かれています.

モデルというのは不必要な性質を大胆に無視して,必要なエッセンスだけを取り上げたものだ....(中略)... 確率モデルをデータにあてはめて現象の理解と予測をうながす営みを統計モデリング (statistical modeling) と呼ぶ....(中略)... すべての確率モデルはあくまでも仮定や仮説であることに注意したい.

まさに,複雑な現実世界を,得られたデータを確率モデルにあてはめることで,モデル化しようという試みこそが統計モデリングなわけです.この本は,実装としてはStanにほぼベッタリになっていますが,本を貫く骨子は,複雑な現実世界をどう記述するか,という姿勢にあると思います.私自身は,統計モデリングにおいて最も大事なのは「自分はこういうモデルで現実世界を表現しよう,切り取ろうと思っているんだよ」,という姿勢なのかな,と個人的に思っていたりします.やりたいことはあくまで複雑な現実世界の理解であって,それを実現するための手段として,統計理論やソフトウェアがあるという位置付けなのかなと.

このあたりは,久保先生の通称みどりほんや,TJOさんのエントリなんかでも述べられており,hoxo_mさんも連載エントリを書かれていますね*4

tjo.hatenablog.com
d.hatena.ne.jp

「統計モデリング」を実現するための手段である「Stan」についての実践的な知識およびリファレンスガイド

Stan自体には,かなりしっかりしたマニュアルがあり,かつその日本語翻訳も行われていたりします.ただ,これらを読むにしても,あまりに膨大なので心が挫ける人も多いのではないでしょうか.これに対して,この本では使いこなすために必要な最低限の知識,および応用的な事項の調べ方(=マニュアルの使い方),について,統計モデリングの実践例をベースにして述べられており,入門書として最適ではないかと思います.

個人的にためになった点

何点かあるのですが,まずは統計モデリングについて,あまり類書ではないような試行錯誤プロセスが載っているのが良いです.まず最初は簡単なモデルで試して,その上で少しずつ仮定を積み上げて精緻なモデルにしていく,といったプロセスが書かれているのは,自分が実際に分析をするときにもこうやれないいのか,というのがイメージしやすいです.

また Chapter 7 の見出しが以下のようになっているのですが,こうした良い統計モデルを作るための実践的な工夫,対処法がしっかり述べられているのって,これも他の本であまりみない(かつとても有用性が高い)です. それも単純に情報量基準の高いモデルを選びましょう,というのではなくて解釈可能性やデータの背景知識と合致する形でモデルを選択しましょう,というスタンスは全くその通りだと思います*5

Chapter 7 回帰分析の悩みどころ
7.1 交互作用
7.2 対数をとるか否か
7.3 非線形の関係
7.4 多重共線形
7.5 交絡
7.6 説明変数が多すぎる
7.7 説明変数にノイズを含む
7.8 打ち切り
7.9 外れ値

同様に,Chapter 10の収束しないときの対処法,も実用上ためになる点です.私自身も,このチャプターに載っているハマりどころに,まさに引っかかったことがあります.そのときの経緯は以下のエントリに詳しく述べてあります.

smrmkt.hatenablog.jp

あとは,Stanで扱えない離散パラメタの対処法について,とても詳しく述べられているのは嬉しいです.そして気がついたら increment_log_prob() はdeprecatedになっていたんですね.それにChapter 12 で述べられている,時間構造と空間構造の等価性については,岩波データサイエンスの vol. 1 の伊庭先生の執筆文を読んでも,完全には腑に落ちきってなかったんですが,事後確率の数式をみたことで疑問が氷解しました*6

そのほか

この本,基本的な想定読者は実務or大学で統計モデリングを日常的に行っている人,もしくはその予備軍の人という感じだと思います.理想的にあると良い知識は,大学教養レベルの線形代数微積分,初等統計学あたりでしょうか.

ただ個人的には,そのような知識がなくてもある程度は読み進められるし,回帰分析が統計パッケージで行える,くらいのレベルであればChapter 1. 3. 5. 7. あたりから得られるものは多いと思います.特に,たとえば心理学や計量社会学を専攻しているような,上記数学知識が十分ではない学生にとってこそ,得るものが大きいように思います.細かい理論がわからないと,統計パッケージをブラックボックスにして,なんでも正規分布を仮定して回帰分析しちゃえばいいや,よくわからないけどパッケージがなんとかしてくれるよ,みたいになりがちです.ですが,そういった姿勢は当然,非常によろしくないわけです.少なくともイメージとして何をしているかは理解して,「自分はこういう仮説を立てて,それを検証するためにこのような統計モデルで現実世界のエッセンスを抜き出しているんだ」というスタンスを持って欲しいなぁと思います.

*1:この場を借りて厚く御礼申し上げます.

*2:Stanでベーシックなモデリング経験のある,統計中級者(Ph. D. クラスのガチ勢ではないというくらいのニュアンスです)の自分で,通読に要した時間が10時間くらいでした.

*3:献本はしてもらいましたが,出版社や著者の回し者ではなく,過剰なPRをしているわけでもありません.あくまで自身の感想を述べております.

*4:そして,ここを掘り下げていくといろいろ宗教論争になりそうなので,このくらいにしておきます.

*5:個人的には,ここが機械学習と統計モデリングの典型的なスタンスの違いなのかなぁと思ったりもします.

*6:時系列の1階差分は,式的には過去から未来にのみ影響がある有向モデルにみえるけど,実際にモデルを推定することを考えると,現在がわかっていれば必然的に過去はその値に影響されるという,双方向の影響が起こるものなんですね.

Fitbitから取得した心拍データで時系列の異常検知を試してみる

井出先生の「異常検知と変化検知」を読んで,自分でも試してみたいと思ったんですが,あいにくちょうどいい時系列データが手元にないなーと思ってました.そんな折,データサイエンスLT祭りの発表の中に,Fitbitデータを可視化するものがあって*1,これはちょうどいいということで試してみましたよというていのエントリになります.

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

異常検知と変化検知 (機械学習プロフェッショナルシリーズ)

Fitbitってなによ

Fitbitが何かしらない人のために一応説明しておくと,最近はやりの活動量計です.私が持っているのは,心拍が取得できるタイプのやつです.風呂に入るとき以外は一日中つけっぱなしで,睡眠とか運動とかを自動で判定してくれるので,手間がかからず便利です.自転車に乗っている時間もちゃんと判定してくれるのは,判定アルゴリズムよくできているなーという感じですね*2.ちなみに運動や睡眠の判定は,データをスマホに転送するタイミングで一気に行われるので,このときに時系列の状態推定アルゴリズムを走らせてるんだろうなぁと思います.その一方で歩数に関しては,手を振ったりしてもほとんどカウントされず,ちゃんと歩いている場合にのみ歩数が増えるところをみると,バンドパスフィルタでも使って地面に足が着地する際のパルス応答的な変化を取ってるんじゃないかなーと推測します*3

Fitbitのデータを取得する

もちろん,Fitbitの各種センサから得られる生データを取得することはできません.ですがFitbitは開発者向けのAPIを提供しており,ここからデータを取得できるようです.また,もっとベタにWebサイトをスクレイピングしちゃうやり方もあるみたいですね.この辺りをサポートしてくれるパッケージをざっと調べたところ,Rなら fitbitScraperPythonなら python-fitbit あたりでしょうか.あとはRなら {httr} パッケージとか使って,OAuthからのAPI直叩きという方法もありそうです.

今回は,普通に {fitbitScraper} を使って,心拍データを取得することにしました.得られるデータは5分間隔のため,1時間で12点,1日で288点になります.以下のようなコードで,適当な日付の心拍データ変動を取得してグラフ化してみました.(fitbitScraper) だけでなく,あとで2軸グラフを記述するために {devtools} 経由で {plotflow} パッケージも入れておきます.(fitbitScraper) は www.fitbit.com へのログインメアドとパスワードが必要なので,持っていない人は取得しておいてください.

install.packages("fitbitScraper")
devtools::install_github("trinker/plotflow")
library(fitbitScraper)
library(plotflow)
library(ggplot2)
library(dplyr)

# Fitbitからのデータ取得
cookie = login(email="www.fitbit.com ログイン用メールアドレス", password="www.fitbit.com ログイン用パスワード")
hr_data <- get_intraday_data(cookie, what="heart-rate", date="2016-10-06")
names(hr_data) <- c("time", "heart_rate") # 変数名にハイフンが入っているのでリネーム
hr_data <- hr_data %>% filter(heart_rate > 0) # Fitbitを取り外しているときは,値が0になるため取り除く

# プロット
hr_plot <- ggplot(data=hr_data, aes(x=time, y=heart_rate)) +
  geom_line(size=0.8) +
  ylim(min=0, max=120) +
  labs(x="Time", y="Heart rate") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
plot(hr_plot)

明け方の寝ている時間は心拍数が50弱/分で推移しており,起きてから犬の散歩にいったので100くらいまで上がっています.そのあとは通常の動きで,夕方前くらいにサイクリングにいったため,やはり100をコンスタントに超えています.そして24時に近づくにつれ心拍が落ち着いていくのが見て取れます.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201225_original.png

こんな感じで,Fitbitのデータは割と簡単に取得できます.

特異スペクトル変換法による時系列データの異常検知

今回使うデータは,時系列の心拍データになります.ここで目的としたいのは,それまでの時間と比べて明らかにおかしな動きがみられたとき,つまり変化点を検知したいということです.井出先生の本にもいくつかの手法が解説されていますが,今回はその中で特異スペクトル変換法を取り上げたいと思います.

この手法は,ざっくりいうとある時点tを起点として,その前のw時点分のデータ(これを部分時系列データと呼びます)を1時点ずつ前にずらしていって,それをk個たばねたもの(仮にX_1と呼びます)と,そこからL時点分後ろにずらして同じくk個たばねたもの(X_2と呼びます)を比較します.両者が似ていれば,t時点で変化は起こっておらず,逆に両者が大きく異なっていたら,t時点付近で時系列データに変化が起こったと考えましょう,ということになります.

このX_1は,1行につきw時点ぶんのデータが並び,それをk行分たばねたk x w行列になります.X_2も同様です.知りたいのはX_1X_2がどれだけ似ているか,ですが,その前段として,X_1X_2 のそれぞれについて,特異値分解で要約し,左特異値ベクトルをm次元分だけ取り出します*4.これにより,時系列の変化を考慮した上で,X_1X_2のそれぞれを要約することができました.

そして得られた2つの特異値について,以下の計算をすることで,最終的な異常値を得ることができます.左辺の第2項は,両者が類似しているほど1に近い値になるので,これを1から引くことで非類似度,つまりこの場合はそれ以前からの変化の度合いを表す値が得られるわけです.

 a(t) = 1 - {\| U_m^{(t)\mathrm{T}} Q_m^{(t)} \|_2}^2

Rで特異スペクトル変換法を実装

さて,それでは実際に計算してみましょう.このあたりの式は,同じく井出先生の「入門機械学習による異常検知」を下敷きにしています.

入門 機械学習による異常検知―Rによる実践ガイド

入門 機械学習による異常検知―Rによる実践ガイド

# 定数
w <- 16 # 部分時系列の要素数
m <- 2 # 類似度算出に使用するSVDの次元数
k <- w / 2 # SVD算出に用いる部分時系列数
L <- k / 2 # 類似度を比較する2つの部分時系列群間のラグ

# 異常値のスコアを算出するメソッド
get_anomaly_score <- function(d, w, m, k, L) {
  anomaly_score <- rep(0, length(d))
  for(t in (w+k):(length(d)-L+1)) {
    start <- t - w - k + 1
    end <- t - 1
    X1 <- t(embed(d[start:end], w))[w:1,] # t以前の部分時系列群
    X2 <- t(embed(d[(start+L):(end+L)], w))[w:1,] # 異常度算出対象の部分時系列群
    
    U1 <- svd(X1)$u[,1:m] # X1にSVDを行い上位m次元を抜き出す
    U2 <- svd(X2)$u[,1:m] # X2にSVDを行い上位m次元を抜き出す
    sig1 <- svd(t(U1) %*% U2)$d[1] # U1とU2の最大特異値を取得
    anomaly_score[t] <- 1 - sig1^2 # 最大特異値の2ノルムを1から引くことで異常値を得る
  }
  anomaly_score
}

# スコアを算出して,心拍の時系列と重ねてプロット
hr <- hr_data$heart_rate
anomaly_score <- get_anomaly_score(hr, w, m, k, L)
as_data <- data.frame(time=hr_data$time, anomaly_score=anomaly_score)
as_plot <- ggplot(as_data, aes(x=time, y=anomaly_score)) +
  geom_line(color="red", size=0.8) +
  labs(title="", x="Time index", y="Anomary score") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
combined_plot <- ggdual_axis(lhs=hr_plot, rhs=as_plot)
plot(combined_plot)

全部で288時点しないので*5,部分時系列の時点数は16くらいに絞っておきます.結果は以下の通りです.おおむね想定したとおりに,変化点が検知できているように思います,ただ後半のサイクリングの変化点が,微妙に後ろにずれちゃっているようにも思います.このあたり,細かいスパイクが悪い影響与えているかもしれないので,スムージングしてからスコアを出したほうがいいのかなぁとも思います.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201224_original.png

パラメタのチューニング

上の例では,すべての定数を決めうちにしちゃっているので,いくつかパラメタを変えてどうスコアが変わるかを見ていきたいと思います.ここではwとmについて試してみます.

まずはwからみていきましょう.wを8, 16, 32の3パターンに変えて,スコアを出してみたのが下の図になります.青が8,赤が16,緑が32になります.みるとわかるように,wが増えるほどスコアが減少して,変化を検出しづらくなっています.部分時系列が増えるんだから,当然そうなりますよね.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201223_original.png

as_data_w08 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 8, m, k, L))
as_data_w16 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 16, m, k, L))
as_data_w32 <- data.frame(time=hr_data$time, anomaly_score=get_anomaly_score(hr, 32, m, k, L))
as_plot_w <- ggplot(as_data_w08, aes(x=time, y=anomaly_score)) +
  geom_line(color="blue", size=0.8) +
  geom_line(data=as_data_w16, aes(x=time, y=anomaly_score), color="red", size=0.8) +
  geom_line(data=as_data_w32, aes(x=time, y=anomaly_score), color="green", size=0.8) +
  labs(title="", x="Time index", y="Anomary score") +
  theme(axis.text=element_text(size=14),
        axis.title=element_text(size=16, face="bold"))
combined_plot_w <- ggdual_axis(lhs=hr_plot, rhs=as_plot_w)
plot(combined_plot_w)

同様に,mについても1, 2, 3と変えてみてみたのが,以下の図になります.青が1,赤が2,緑が3になります.こちらも同様に,次元数が少ないほど結果がピーキーになってますね.このあたり,元の時系列データの特性に合わせて,適切な次元数を決めてあげる必要がありそうです.

http://cdn-ak.f.st-hatena.com/images/fotolife/S/SAM/20161019/20161019201222_original.png

そんなわけで,ライフログデータでいろいろ試してみることができますね.面白い.コードはgistにあげてあるので,よろしければどうぞ.

*1:@millionsmileさんの「チャップが山粕を一撃する方法」という発表なんですが,特に資料は公開されてないみたいですね...

*2:とはいえ,感覚的には15分以上連続で運転するぐらいでないと,サイクリングと判定をしてくれないので,誤判定を防ぐためなんでしょうけど,割と保守的だなーとは思います.

*3:そもそも歩数カウントはオンラインでちゃんとカウントされていくため,時間&空間計算量をそこそこ必要とする時系列の状態推定は厳しそうに思います.なので単純なフィルタリングをしてるだけなんじゃないかなー

*4:ここで特異値分解で得られる左特異値ベクトルのL2ノルムをとると1になります

*5:この日のデータは欠損があるっぽくて,286時点しかありませんでしたが.