Spark机器学习
线性模型
逻辑回归--逻辑损失(logistic loss)线性支持向量机(Support Vector Machine, SVM)--合页损失(hinge loss)朴素贝叶斯(Naive Bayes)决策树
0 准备数据
kaggle2.blob.core.windows.net/competitions-data/kaggle/3526/train.tsv
sed 1d train.tsv > train_noheader.tsv
0 运行环境
cd /Users/erichan/Garden/spark-1.5.1-bin-cdh4
bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2
import org.apache.spark.mllib.feature._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
import org.apache.spark.mllib.linalg.distributed.RowMatrix
import org.apache.spark.rdd.RDD
import org.apache.spark.mllib.optimization._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.evaluation._
import org.apache.spark.mllib.classification._
import org.apache.spark.mllib.tree.DecisionTree
import org.apache.spark.mllib.tree.configuration.Algo
import org.apache.spark.mllib.tree.impurity._
1 提取特征
val PATH = "/Users/erichan/sourcecode/book/Spark机器学习"
val rawData = sc.textFile(PATH+"/train_noheader.tsv")
val records = rawData.map(line => line.split("\t"))
records.first
Array[String] = Array("http://www.bloomberg.com/news/2010-12-23/ibm-predicts-holographic-calls-air-breathing-batteries-by-2015.html", "4042", "{""title"":""IBM Sees Holographic Calls Air Breathing Batteries ibm sees holographic calls, air-breathing batteries"",""body"":""A sign stands outside the International Business Machines Corp IBM Almaden Research Center campus in San Jose California Photographer Tony Avelar Bloomberg Buildings stand at the International Business Machines Corp IBM Almaden Research Center campus in the Santa Teresa Hills of San Jose California Photographer Tony Avelar Bloomberg By 2015 your mobile phone will project a 3 D image of anyone who calls and your laptop will be powered by kinetic energy At least that s what International Business Machines Corp sees ...
val data = records.map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
LabeledPoint(label, Vectors.dense(features))
}
data.cache
val numData = data.count
numData: Long = 7395
// note that some of our data contains negative feature vaues. For naive Bayes we convert these to zeros
val nbData = records.map { r =>
val trimmed = r.map(_.replaceAll("\"", ""))
val label = trimmed(r.size - 1).toInt
val features = trimmed.slice(4, r.size - 1).map(d => if (d == "?") 0.0 else d.toDouble).map(d => if (d < 0) 0.0 else d)
LabeledPoint(label, Vectors.dense(features))
}
2 训练分类模型
2.1 逻辑回归模型
// train a Logistic Regression model
val numIterations = 10
val maxTreeDepth = 5
val lrModel = LogisticRegressionWithSGD.train(data, numIterations)
2.2 SVM模型
val svmModel = SVMWithSGD.train(data, numIterations)
2.3 朴素贝叶斯
val nbModel = NaiveBayes.train(nbData)
2.4 决策树
val dtModel = DecisionTree.train(data, Algo.Classification, Entropy, maxTreeDepth)
3 使用分类模型
3.1 预测
以逻辑回归模型为例
val dataPoint = data.first
val prediction = lrModel.predict(dataPoint.features)
prediction: Double = 1.0
val trueLabel = dataPoint.label
trueLabel: Double = 0.0
val predictions = lrModel.predict(data.map(lp => lp.features))
predictions.take(5)
Array[Double] = Array(1.0, 1.0, 1.0, 1.0, 1.0)
4 评估性能
4.1 逻辑回归模型的正确率
val lrTotalCorrect = data.map { point =>
if (lrModel.predict(point.features) == point.label) 1 else 0
}.sum
val lrAccuracy = lrTotalCorrect / numData
lrAccuracy: Double = 0.5146720757268425
4.2 SVM模型的正确率
val svmTotalCorrect = data.map { point =>
if (svmModel.predict(point.features) == point.label) 1 else 0
}.sum
val svmAccuracy = svmTotalCorrect / numData
svmAccuracy: Double = 0.5146720757268425
4.3 朴素贝叶斯的正确率
val nbTotalCorrect = nbData.map { point =>
if (nbModel.predict(point.features) == point.label) 1 else 0
}.sum
val nbAccuracy = nbTotalCorrect / numData
nbAccuracy: Double = 0.5803921568627451
4.4 决策树的正确率
// decision tree threshold needs to be specified
val dtTotalCorrect = data.map { point =>
val score = dtModel.predict(point.features)
val predicted = if (score > 0.5) 1 else 0
if (predicted == point.label) 1 else 0
}.sum
val dtAccuracy = dtTotalCorrect / numData
dtAccuracy: Double = 0.6482758620689655
4.5 ROC曲线和AUC
val metrics = Seq(lrModel, svmModel).map { model =>
val scoreAndLabels = data.map { point =>
(model.predict(point.features), point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val nbMetrics = Seq(nbModel).map{ model =>
val scoreAndLabels = nbData.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val dtMetrics = Seq(dtModel).map{ model =>
val scoreAndLabels = data.map { point =>
val score = model.predict(point.features)
(if (score > 0.5) 1.0 else 0.0, point.label)
}
val metrics = new BinaryClassificationMetrics(scoreAndLabels)
(model.getClass.getSimpleName, metrics.areaUnderPR, metrics.areaUnderROC)
}
val allMetrics = metrics ++ nbMetrics ++ dtMetrics
allMetrics.foreach{ case (m, pr, roc) =>
println(f"$m, Area under PR: ${pr * 100.0}%2.4f%%, Area under ROC: ${roc * 100.0}%2.4f%%")
}
LogisticRegressionModel, Area under PR: 75.6759%, Area under ROC: 50.1418%SVMModel, Area under PR: 75.6759%, Area under ROC: 50.1418%NaiveBayesModel, Area under PR: 68.0851%, Area under ROC: 58.3559