一、数据的加载
1.1、数据的字段
"url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label"
下面是部分字段说明
1.2、数据的加载
val data = spark.sparkContext.textFile("C:\\Users\\12285\\Downloads\\train.tsv")
.filter(!_.contains("alchemy_category_score"))
.map(line => line.split("\t"))
二、数据预处理
2.1、数据清洗
val dataVecs = data.map(t => {
// 去掉引号
val fields = t.map(_.replaceAll("\"", ""))
// 将最后一列(标签列)转为整数
val label = fields(fields.size - 1).toInt
// 把第四列中的"?"转为0.0
val features = fields.slice(4, fields.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
// 打标签, 将标签及特征转换为LabeledPoint
LabeledPoint(label, Vectors.dense(features))
})
2.2、考虑特殊情况, 贝叶斯算法中数据不小于0,所以需要做些处理
// 朴素贝叶斯数据集
val nbDataVecs = data.map(t => {
// 去掉引号
val fields = t.map(_.replaceAll("\"", ""))
// 将最后一列(标签列)转为整数
val label = fields(fields.size - 1).toInt
val features = fields.slice(4, fields.size - 1)
.map(d => if (d == "?") 0.0 else d.toDouble)
.map(d => if (d < 0.0) 0.0 else d) // 朴素贝叶斯算法时,数据需不小于0
LabeledPoint(label, Vectors.dense(features))
})
2.3、创建DataFrame
// 创建DataFrame
val df = spark.createDataFrame(dataVecs)
val nbDf = spark.createDataFrame(nbDataVecs)
2.4、划分训练集, 测试集
// 训练集, 测试集
val Array(trainingDf, testDf) = df.randomSplit(Array(0.8, 0.2))
val Array(nbTrainingDf, nbTestDf) = nbDf.randomSplit(Array(0.8, 0.2))
// 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。
trainingDf.persist()
testDf.persist()
nbTrainingDf.persist()
nbTestDf.persist()
三、模型
3.1、贝叶斯模型
3.1.1、创建贝叶斯模型
// Estimator
val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features")
3.1.2、训练模型
// 训练数据
val nbModel = nb.fit(nbTrainingDf)
3.1.2、使用测试数据预测
// 预测数据
val nbPrediction = nbModel.transform(nbTestDf)
3.1.3、统计朴素贝叶斯准确性
//t1 存放预测值的数组,t2存放测试数据标签值 t3存放测试数据总行数
val (t1, t2, t3) = (nbPrediction.select("prediction").collect(),
nbTestDf.select("label").collect(),
nbTestDf.count().toInt)
var t4 = 0
for (i <- 0 until t3) {
if (t1(i) == t2(i)) {
t4 += 1
}
}
val nbAccuracy = 1.0 * t4 / t3
println("朴素贝叶斯 预测准确值=" + nbAccuracy)
3.2、逻辑回归模型
注: 该案例中使用交叉验证, 与上面贝叶斯模型中使用的的训练-测试验证不同
3.2.1、建立特征索引
// 建立特征索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.fit(df)
3.2.2、创建逻辑回归模型
// 创建逻辑回归模型
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
.setRegParam(0.001)
3.2.3、创建流水线
// 逻辑回归的流水线,包含2个stages(featureIndexer和lr)
val lrPipeline = new Pipeline().setStages(Array(featureIndexer,lr))
3.2.4、配置网格参数
// 配置网格参数,使用ParamGridBuilder构造一个parameter grid
val lrParamGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.3, 0.5))
.addGrid(lr.maxIter, Array(10, 20, 30))
.build()
3.2.5、实例化交叉验证模型
// 实例化交叉验证模型
val evaluator = new BinaryClassificationEvaluator()
val lrCV = new CrossValidator()
.setEstimator(lrPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(lrParamGrid)
.setNumFolds(2)
3.2.6、通过交叉验证模型,获取最优参数集
// 通过交叉验证模型,获取最优参数集,
val lrCvModel = lrCV.fit(trainingDf)
3.2.7、测试模型
// 并测试模型
val lrPrediction = lrCvModel.transform(testDf)
// 查看数据
lrPrediction.select("label", "prediction").show(10)
3.2.8、查看逻辑回归匹配模型的最优参数
// 查看逻辑回归匹配模型的最优参数
val lrBestModel = lrCvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = lrBestModel.stages(1).asInstanceOf[LogisticRegressionModel]
println("lrBestModel.getRegParam = " + lrModel.getRegParam)
println("lrBestModel.getMaxIter = " + lrModel.getMaxIter)
3.2.9、统计逻辑回归的预测正确率
// 统计逻辑回归的预测正确率
// t_lr 为逻辑回归预测值的数组,
// t_label 为测试集的标签值的数组
val (t_lr, t_label, t_count) = (lrPrediction.select("prediction").collect,
testDf.select("label").collect,
testDf.count.toInt)
// c_lr 为统计逻辑回归预测正确个数的累加器
var Array(c_lr) = Array(0)
// 遍历循环,统计逻辑回归正确预测的次数
for (i <- 0 to t_count - 1) {
if (t_lr(i) == t_label(i)) c_lr += 1
}
// 统计逻辑回归正确率
println("统计逻辑回归正确率 = " + 1.0 * c_lr / t_count)
3.3、决策树模型
def testDt() = {
// 组装
// 建立特征索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.fit(df)
// 创建决策树模型
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setImpurity("entropy")
.setMaxBins(100)
.setMaxDepth(5)
.setMinInfoGain(0.01)
// 一个是决策树的流水线,包含2个stages(featureIndexer 和 dt)
val dtPipeline = new Pipeline().setStages(Array(featureIndexer, dt))
// 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid
val dtParamGrid = new ParamGridBuilder()
.addGrid(dt.maxDepth, Array(3, 5, 7))
.build()
// 分别实例化交叉验证模型
val evaluator = new BinaryClassificationEvaluator()
val dtCV = new CrossValidator()
.setEstimator(dtPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(dtParamGrid)
.setNumFolds(2)
// 通过交叉验证模型,获取最优参数集,
val dtCvModel = dtCV.fit(trainingDf)
// 并测试模型
val dtPrediction = dtCvModel.transform(testDf)
// 查看数据
dtPrediction.select("label", "prediction").show(10)
// 查看决策树匹配模型的参数
val dtBestModel = dtCvModel.bestModel.asInstanceOf[PipelineModel]
val dtModel = dtBestModel.stages(1).asInstanceOf[DecisionTreeClassificationModel]
println("dtModel.getMaxDepth = " + dtModel.getMaxDepth)
println("dtModel.getMaxIter = " + dtModel.numFeatures)
// 统计决策树的预测正确率
// t_dt 为决策树预测值的数组
// t_label 为测试集的标签值的数组
val (t_dt, t_label, t_count) = (dtPrediction.select("prediction").collect,
testDf.select("label").collect,
testDf.count.toInt)
//c_dt 为统计决策树预测正确个数的累加器
var Array(c_dt) = Array(0)
//遍历循环,统计逻辑回归正确预测的次数
for (i <- 0 to t_count - 1) {
if (t_dt(i) == t_label(i)) c_dt += 1
}
// 统计决策树正确率
println("统计决策树正确率 = " + 1.0 * c_dt / t_count)
}
完整代码
package com.chb.test
import org.apache.spark.ml.classification._
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.ml.feature.{LabeledPoint, VectorIndexer}
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.SparkSession
/**
* Created by chb on 2019/5/25.
*/
object TestClassifer {
val spark = SparkSession.builder()
.appName("test")
.master("local[*]")
.config("spark.some.config.option", "some-value")
.getOrCreate()
def main(args: Array[String]): Unit = {
val data = spark.sparkContext.textFile("C:\\Users\\12285\\Downloads\\train.tsv")
.filter(!_.contains("alchemy_category_score"))
.map(line => line.split("\t"))
val dataVecs = data.map(t => {
// 去掉引号
val fields = t.map(_.replaceAll("\"", ""))
// 将最后一列(标签列)转为整数
val label = fields(fields.size - 1).toInt
// 把第四列中的"?"转为0.0
val features = fields.slice(4, fields.size - 1).map(d => if (d == "?") 0.0 else d.toDouble)
// 打标签, 将标签及特征转换为LabeledPoint
LabeledPoint(label, Vectors.dense(features))
})
// 朴素贝叶斯数据集
val nbDataVecs = data.map(t => {
// 去掉引号
val fields = t.map(_.replaceAll("\"", ""))
// 将最后一列(标签列)转为整数
val label = fields(fields.size - 1).toInt
val features = fields.slice(4, fields.size - 1)
.map(d => if (d == "?") 0.0 else d.toDouble)
.map(d => if (d < 0.0) 0.0 else d) // 朴素贝叶斯算法时,数据需不小于0
LabeledPoint(label, Vectors.dense(features))
})
// 创建DataFrame
val df = spark.createDataFrame(dataVecs)
val nbDf = spark.createDataFrame(nbDataVecs)
df.show(10)
// 训练集, 测试集
val Array(trainingDf, testDf) = df.randomSplit(Array(0.8, 0.2))
val Array(nbTrainingDf, nbTestDf) = nbDf.randomSplit(Array(0.8, 0.2))
// 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。
trainingDf.persist()
testDf.persist()
nbTrainingDf.persist()
nbTestDf.persist()
// testBayes()
testLr()
// testDt()
def testBayes() = {
// Estimator
val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features")
// 训练数据
val nbModel = nb.fit(nbTrainingDf)
// 预测数据
val nbPrediction = nbModel.transform(nbTestDf)
nbPrediction.show(10)
//t1 存放预测值的数组,t2存放测试数据标签值 t3存放测试数据总行数
val (t1, t2, t3) = (nbPrediction.select("prediction").collect(),
nbTestDf.select("label").collect(),
nbTestDf.count().toInt)
var t4 = 0
for (i <- 0 until t3) {
if (t1(i) == t2(i)) {
t4 += 1
}
}
val nbAccuracy = 1.0 * t4 / t3
println("朴素贝叶斯 预测准确值=" + nbAccuracy)
}
def testLr() = {
// 组装
// 建立特征索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.fit(df)
// 创建逻辑回归模型
val lr = new LogisticRegression()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setMaxIter(10)
.setRegParam(0.001)
// 逻辑回归的流水线,包含2个stages(featureIndexer和lr)
val lrPipeline = new Pipeline().setStages(Array(featureIndexer, lr))
// 配置网格参数,使用ParamGridBuilder构造一个parameter grid
val lrParamGrid = new ParamGridBuilder()
.addGrid(lr.regParam, Array(0.1, 0.3, 0.5))
.addGrid(lr.maxIter, Array(10, 20, 30))
.build()
// 实例化交叉验证模型
val evaluator = new BinaryClassificationEvaluator()
val lrCV = new CrossValidator()
.setEstimator(lrPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(lrParamGrid)
.setNumFolds(2)
// 通过交叉验证模型,获取最优参数集,
val lrCvModel = lrCV.fit(trainingDf)
// 并测试模型
val lrPrediction = lrCvModel.transform(testDf)
// 查看数据
lrPrediction.select("label", "prediction").show(10)
// 查看逻辑回归匹配模型的最优参数
val lrBestModel = lrCvModel.bestModel.asInstanceOf[PipelineModel]
val lrModel = lrBestModel.stages(1).asInstanceOf[LogisticRegressionModel]
println("lrBestModel.getRegParam = " + lrModel.getRegParam)
println("lrBestModel.getMaxIter = " + lrModel.getMaxIter)
// 统计逻辑回归的预测正确率
// t_lr 为逻辑回归预测值的数组,
// t_label 为测试集的标签值的数组
val (t_lr, t_label, t_count) = (lrPrediction.select("prediction").collect,
testDf.select("label").collect,
testDf.count.toInt)
// c_lr 为统计逻辑回归预测正确个数的累加器
var Array(c_lr) = Array(0)
// 遍历循环,统计逻辑回归正确预测的次数
for (i <- 0 to t_count - 1) {
if (t_lr(i) == t_label(i)) c_lr += 1
}
// 统计逻辑回归正确率
println("统计逻辑回归正确率 = " + 1.0 * c_lr / t_count)
}
def testDt() = {
// 组装
// 建立特征索引
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.fit(df)
// 创建决策树模型
val dt = new DecisionTreeClassifier()
.setLabelCol("label")
.setFeaturesCol("indexedFeatures")
.setImpurity("entropy")
.setMaxBins(100)
.setMaxDepth(5)
.setMinInfoGain(0.01)
// 一个是决策树的流水线,包含2个stages(featureIndexer 和 dt)
val dtPipeline = new Pipeline().setStages(Array(featureIndexer, dt))
// 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid
val dtParamGrid = new ParamGridBuilder()
.addGrid(dt.maxDepth, Array(3, 5, 7))
.build()
// 分别实例化交叉验证模型
val evaluator = new BinaryClassificationEvaluator()
val dtCV = new CrossValidator()
.setEstimator(dtPipeline)
.setEvaluator(evaluator)
.setEstimatorParamMaps(dtParamGrid)
.setNumFolds(2)
// 通过交叉验证模型,获取最优参数集,
val dtCvModel = dtCV.fit(trainingDf)
// 并测试模型
val dtPrediction = dtCvModel.transform(testDf)
// 查看数据
dtPrediction.select("label", "prediction").show(10)
// 查看决策树匹配模型的参数
val dtBestModel = dtCvModel.bestModel.asInstanceOf[PipelineModel]
val dtModel = dtBestModel.stages(1).asInstanceOf[DecisionTreeClassificationModel]
println("dtModel.getMaxDepth = " + dtModel.getMaxDepth)
println("dtModel.getMaxIter = " + dtModel.numFeatures)
// 统计决策树的预测正确率
// t_dt 为决策树预测值的数组
// t_label 为测试集的标签值的数组
val (t_dt, t_label, t_count) = (dtPrediction.select("prediction").collect,
testDf.select("label").collect,
testDf.count.toInt)
//c_dt 为统计决策树预测正确个数的累加器
var Array(c_dt) = Array(0)
//遍历循环,统计逻辑回归正确预测的次数
for (i <- 0 to t_count - 1) {
if (t_dt(i) == t_label(i)) c_dt += 1
}
// 统计决策树正确率
println("统计决策树正确率 = " + 1.0 * c_dt / t_count)
}
}
}