Spark ML 构建分类模型

    xiaoxiao2025-01-11  9

    一、数据的加载

    1.1、数据的字段

    "url" "urlid" "boilerplate" "alchemy_category" "alchemy_category_score" "avglinksize" "commonlinkratio_1" "commonlinkratio_2" "commonlinkratio_3" "commonlinkratio_4" "compression_ratio" "embed_ratio" "framebased" "frameTagRatio" "hasDomainLink" "html_ratio" "image_ratio" "is_news" "lengthyLinkDomain" "linkwordscore" "news_front_page" "non_markup_alphanum_characters" "numberOfLinks" "numwords_in_url" "parametrizedLinkRatio" "spelling_errors_ratio" "label"

    下面是部分字段说明

    1.2、数据的加载

    val data = spark.sparkContext.textFile("C:\\Users\\12285\\Downloads\\train.tsv") .filter(!_.contains("alchemy_category_score")) .map(line => line.split("\t"))

    二、数据预处理

    2.1、数据清洗

    val dataVecs = data.map(t => { // 去掉引号 val fields = t.map(_.replaceAll("\"", "")) // 将最后一列(标签列)转为整数 val label = fields(fields.size - 1).toInt // 把第四列中的"?"转为0.0 val features = fields.slice(4, fields.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) // 打标签, 将标签及特征转换为LabeledPoint LabeledPoint(label, Vectors.dense(features)) })

    2.2、考虑特殊情况, 贝叶斯算法中数据不小于0,所以需要做些处理

    // 朴素贝叶斯数据集 val nbDataVecs = data.map(t => { // 去掉引号 val fields = t.map(_.replaceAll("\"", "")) // 将最后一列(标签列)转为整数 val label = fields(fields.size - 1).toInt val features = fields.slice(4, fields.size - 1) .map(d => if (d == "?") 0.0 else d.toDouble) .map(d => if (d < 0.0) 0.0 else d) // 朴素贝叶斯算法时,数据需不小于0 LabeledPoint(label, Vectors.dense(features)) })

    2.3、创建DataFrame

    // 创建DataFrame val df = spark.createDataFrame(dataVecs) val nbDf = spark.createDataFrame(nbDataVecs)

    2.4、划分训练集, 测试集

    // 训练集, 测试集 val Array(trainingDf, testDf) = df.randomSplit(Array(0.8, 0.2)) val Array(nbTrainingDf, nbTestDf) = nbDf.randomSplit(Array(0.8, 0.2)) // 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。 trainingDf.persist() testDf.persist() nbTrainingDf.persist() nbTestDf.persist()

    三、模型

    3.1、贝叶斯模型

    3.1.1、创建贝叶斯模型

    // Estimator val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features")

    3.1.2、训练模型

    // 训练数据 val nbModel = nb.fit(nbTrainingDf)

    3.1.2、使用测试数据预测

    // 预测数据 val nbPrediction = nbModel.transform(nbTestDf)

    3.1.3、统计朴素贝叶斯准确性

    //t1 存放预测值的数组,t2存放测试数据标签值 t3存放测试数据总行数 val (t1, t2, t3) = (nbPrediction.select("prediction").collect(), nbTestDf.select("label").collect(), nbTestDf.count().toInt) var t4 = 0 for (i <- 0 until t3) { if (t1(i) == t2(i)) { t4 += 1 } } val nbAccuracy = 1.0 * t4 / t3 println("朴素贝叶斯 预测准确值=" + nbAccuracy)

    3.2、逻辑回归模型

       注: 该案例中使用交叉验证, 与上面贝叶斯模型中使用的的训练-测试验证不同

    3.2.1、建立特征索引

    // 建立特征索引 val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .fit(df)

    3.2.2、创建逻辑回归模型

    // 创建逻辑回归模型 val lr = new LogisticRegression() .setLabelCol("label") .setFeaturesCol("indexedFeatures") .setMaxIter(10) .setRegParam(0.001)

    3.2.3、创建流水线

    // 逻辑回归的流水线,包含2个stages(featureIndexer和lr) val lrPipeline = new Pipeline().setStages(Array(featureIndexer,lr))

    3.2.4、配置网格参数

    // 配置网格参数,使用ParamGridBuilder构造一个parameter grid val lrParamGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.3, 0.5)) .addGrid(lr.maxIter, Array(10, 20, 30)) .build()

    3.2.5、实例化交叉验证模型

    // 实例化交叉验证模型 val evaluator = new BinaryClassificationEvaluator() val lrCV = new CrossValidator() .setEstimator(lrPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(lrParamGrid) .setNumFolds(2)

    3.2.6、通过交叉验证模型,获取最优参数集

    // 通过交叉验证模型,获取最优参数集, val lrCvModel = lrCV.fit(trainingDf)

    3.2.7、测试模型

    // 并测试模型 val lrPrediction = lrCvModel.transform(testDf) // 查看数据 lrPrediction.select("label", "prediction").show(10)

    3.2.8、查看逻辑回归匹配模型的最优参数

    // 查看逻辑回归匹配模型的最优参数 val lrBestModel = lrCvModel.bestModel.asInstanceOf[PipelineModel] val lrModel = lrBestModel.stages(1).asInstanceOf[LogisticRegressionModel] println("lrBestModel.getRegParam = " + lrModel.getRegParam) println("lrBestModel.getMaxIter = " + lrModel.getMaxIter)

    3.2.9、统计逻辑回归的预测正确率

    // 统计逻辑回归的预测正确率 // t_lr 为逻辑回归预测值的数组, // t_label 为测试集的标签值的数组 val (t_lr, t_label, t_count) = (lrPrediction.select("prediction").collect, testDf.select("label").collect, testDf.count.toInt) // c_lr 为统计逻辑回归预测正确个数的累加器 var Array(c_lr) = Array(0) // 遍历循环,统计逻辑回归正确预测的次数 for (i <- 0 to t_count - 1) { if (t_lr(i) == t_label(i)) c_lr += 1 } // 统计逻辑回归正确率 println("统计逻辑回归正确率 = " + 1.0 * c_lr / t_count)

    3.3、决策树模型

    def testDt() = { // 组装 // 建立特征索引 val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .fit(df) // 创建决策树模型 val dt = new DecisionTreeClassifier() .setLabelCol("label") .setFeaturesCol("indexedFeatures") .setImpurity("entropy") .setMaxBins(100) .setMaxDepth(5) .setMinInfoGain(0.01) // 一个是决策树的流水线,包含2个stages(featureIndexer 和 dt) val dtPipeline = new Pipeline().setStages(Array(featureIndexer, dt)) // 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid val dtParamGrid = new ParamGridBuilder() .addGrid(dt.maxDepth, Array(3, 5, 7)) .build() // 分别实例化交叉验证模型 val evaluator = new BinaryClassificationEvaluator() val dtCV = new CrossValidator() .setEstimator(dtPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(dtParamGrid) .setNumFolds(2) // 通过交叉验证模型,获取最优参数集, val dtCvModel = dtCV.fit(trainingDf) // 并测试模型 val dtPrediction = dtCvModel.transform(testDf) // 查看数据 dtPrediction.select("label", "prediction").show(10) // 查看决策树匹配模型的参数 val dtBestModel = dtCvModel.bestModel.asInstanceOf[PipelineModel] val dtModel = dtBestModel.stages(1).asInstanceOf[DecisionTreeClassificationModel] println("dtModel.getMaxDepth = " + dtModel.getMaxDepth) println("dtModel.getMaxIter = " + dtModel.numFeatures) // 统计决策树的预测正确率 // t_dt 为决策树预测值的数组 // t_label 为测试集的标签值的数组 val (t_dt, t_label, t_count) = (dtPrediction.select("prediction").collect, testDf.select("label").collect, testDf.count.toInt) //c_dt 为统计决策树预测正确个数的累加器 var Array(c_dt) = Array(0) //遍历循环,统计逻辑回归正确预测的次数 for (i <- 0 to t_count - 1) { if (t_dt(i) == t_label(i)) c_dt += 1 } // 统计决策树正确率 println("统计决策树正确率 = " + 1.0 * c_dt / t_count) }

    完整代码

    package com.chb.test import org.apache.spark.ml.classification._ import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator import org.apache.spark.ml.feature.{LabeledPoint, VectorIndexer} import org.apache.spark.ml.linalg.Vectors import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder} import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.sql.SparkSession /** * Created by chb on 2019/5/25. */ object TestClassifer { val spark = SparkSession.builder() .appName("test") .master("local[*]") .config("spark.some.config.option", "some-value") .getOrCreate() def main(args: Array[String]): Unit = { val data = spark.sparkContext.textFile("C:\\Users\\12285\\Downloads\\train.tsv") .filter(!_.contains("alchemy_category_score")) .map(line => line.split("\t")) val dataVecs = data.map(t => { // 去掉引号 val fields = t.map(_.replaceAll("\"", "")) // 将最后一列(标签列)转为整数 val label = fields(fields.size - 1).toInt // 把第四列中的"?"转为0.0 val features = fields.slice(4, fields.size - 1).map(d => if (d == "?") 0.0 else d.toDouble) // 打标签, 将标签及特征转换为LabeledPoint LabeledPoint(label, Vectors.dense(features)) }) // 朴素贝叶斯数据集 val nbDataVecs = data.map(t => { // 去掉引号 val fields = t.map(_.replaceAll("\"", "")) // 将最后一列(标签列)转为整数 val label = fields(fields.size - 1).toInt val features = fields.slice(4, fields.size - 1) .map(d => if (d == "?") 0.0 else d.toDouble) .map(d => if (d < 0.0) 0.0 else d) // 朴素贝叶斯算法时,数据需不小于0 LabeledPoint(label, Vectors.dense(features)) }) // 创建DataFrame val df = spark.createDataFrame(dataVecs) val nbDf = spark.createDataFrame(nbDataVecs) df.show(10) // 训练集, 测试集 val Array(trainingDf, testDf) = df.randomSplit(Array(0.8, 0.2)) val Array(nbTrainingDf, nbTestDf) = nbDf.randomSplit(Array(0.8, 0.2)) // 由于后续使用网格参数和交叉验证的时候,需要多次使用到训练集和测试集,所以将这两者载入内存,可大大提高性能。 trainingDf.persist() testDf.persist() nbTrainingDf.persist() nbTestDf.persist() // testBayes() testLr() // testDt() def testBayes() = { // Estimator val nb = new NaiveBayes().setLabelCol("label").setFeaturesCol("features") // 训练数据 val nbModel = nb.fit(nbTrainingDf) // 预测数据 val nbPrediction = nbModel.transform(nbTestDf) nbPrediction.show(10) //t1 存放预测值的数组,t2存放测试数据标签值 t3存放测试数据总行数 val (t1, t2, t3) = (nbPrediction.select("prediction").collect(), nbTestDf.select("label").collect(), nbTestDf.count().toInt) var t4 = 0 for (i <- 0 until t3) { if (t1(i) == t2(i)) { t4 += 1 } } val nbAccuracy = 1.0 * t4 / t3 println("朴素贝叶斯 预测准确值=" + nbAccuracy) } def testLr() = { // 组装 // 建立特征索引 val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .fit(df) // 创建逻辑回归模型 val lr = new LogisticRegression() .setLabelCol("label") .setFeaturesCol("indexedFeatures") .setMaxIter(10) .setRegParam(0.001) // 逻辑回归的流水线,包含2个stages(featureIndexer和lr) val lrPipeline = new Pipeline().setStages(Array(featureIndexer, lr)) // 配置网格参数,使用ParamGridBuilder构造一个parameter grid val lrParamGrid = new ParamGridBuilder() .addGrid(lr.regParam, Array(0.1, 0.3, 0.5)) .addGrid(lr.maxIter, Array(10, 20, 30)) .build() // 实例化交叉验证模型 val evaluator = new BinaryClassificationEvaluator() val lrCV = new CrossValidator() .setEstimator(lrPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(lrParamGrid) .setNumFolds(2) // 通过交叉验证模型,获取最优参数集, val lrCvModel = lrCV.fit(trainingDf) // 并测试模型 val lrPrediction = lrCvModel.transform(testDf) // 查看数据 lrPrediction.select("label", "prediction").show(10) // 查看逻辑回归匹配模型的最优参数 val lrBestModel = lrCvModel.bestModel.asInstanceOf[PipelineModel] val lrModel = lrBestModel.stages(1).asInstanceOf[LogisticRegressionModel] println("lrBestModel.getRegParam = " + lrModel.getRegParam) println("lrBestModel.getMaxIter = " + lrModel.getMaxIter) // 统计逻辑回归的预测正确率 // t_lr 为逻辑回归预测值的数组, // t_label 为测试集的标签值的数组 val (t_lr, t_label, t_count) = (lrPrediction.select("prediction").collect, testDf.select("label").collect, testDf.count.toInt) // c_lr 为统计逻辑回归预测正确个数的累加器 var Array(c_lr) = Array(0) // 遍历循环,统计逻辑回归正确预测的次数 for (i <- 0 to t_count - 1) { if (t_lr(i) == t_label(i)) c_lr += 1 } // 统计逻辑回归正确率 println("统计逻辑回归正确率 = " + 1.0 * c_lr / t_count) } def testDt() = { // 组装 // 建立特征索引 val featureIndexer = new VectorIndexer() .setInputCol("features") .setOutputCol("indexedFeatures") .fit(df) // 创建决策树模型 val dt = new DecisionTreeClassifier() .setLabelCol("label") .setFeaturesCol("indexedFeatures") .setImpurity("entropy") .setMaxBins(100) .setMaxDepth(5) .setMinInfoGain(0.01) // 一个是决策树的流水线,包含2个stages(featureIndexer 和 dt) val dtPipeline = new Pipeline().setStages(Array(featureIndexer, dt)) // 分别配置网格参数,使用ParamGridBuilder构造一个parameter grid val dtParamGrid = new ParamGridBuilder() .addGrid(dt.maxDepth, Array(3, 5, 7)) .build() // 分别实例化交叉验证模型 val evaluator = new BinaryClassificationEvaluator() val dtCV = new CrossValidator() .setEstimator(dtPipeline) .setEvaluator(evaluator) .setEstimatorParamMaps(dtParamGrid) .setNumFolds(2) // 通过交叉验证模型,获取最优参数集, val dtCvModel = dtCV.fit(trainingDf) // 并测试模型 val dtPrediction = dtCvModel.transform(testDf) // 查看数据 dtPrediction.select("label", "prediction").show(10) // 查看决策树匹配模型的参数 val dtBestModel = dtCvModel.bestModel.asInstanceOf[PipelineModel] val dtModel = dtBestModel.stages(1).asInstanceOf[DecisionTreeClassificationModel] println("dtModel.getMaxDepth = " + dtModel.getMaxDepth) println("dtModel.getMaxIter = " + dtModel.numFeatures) // 统计决策树的预测正确率 // t_dt 为决策树预测值的数组 // t_label 为测试集的标签值的数组 val (t_dt, t_label, t_count) = (dtPrediction.select("prediction").collect, testDf.select("label").collect, testDf.count.toInt) //c_dt 为统计决策树预测正确个数的累加器 var Array(c_dt) = Array(0) //遍历循环,统计逻辑回归正确预测的次数 for (i <- 0 to t_count - 1) { if (t_dt(i) == t_label(i)) c_dt += 1 } // 统计决策树正确率 println("统计决策树正确率 = " + 1.0 * c_dt / t_count) } } }
    最新回复(0)