Spark机器学习
自然语言处理(NLP,Natural Language Processing)
提取特征建模机器学习
TF-IDF(词频 term frequency–逆向文件频率 inverse document frequency)
短语加权:根据词频,为单词赋予权值特征哈希:使用哈希方程对特征赋予向量下标
0 运行环境
tar xfvz 20news-bydate.tar.gz
export SPARK_HOME=/Users/erichan/Garden/spark-1.5.1-bin-hadoop2.6
cd $SPARK_HOME
bin/spark-shell --name my_mlib --packages org.jblas:jblas:1.2.4-SNAPSHOT --driver-memory 4G --executor-memory 4G --driver-cores 2
1 提取特征
val PATH = "/Users/erichan/sourcecode/book/Spark机器学习/20news-bydate"
val path = PATH+"/20news-bydate-train/*"
val rdd = sc.wholeTextFiles(path)
println(rdd.count)
11314
查看新闻组主题
val newsgroups = rdd.map { case (file, text) => file.split("/").takeRight(2).head }
val countByGroup = newsgroups.map(n => (n, 1)).reduceByKey(_ + _).collect.sortBy(-_._2).mkString("\n")
println(countByGroup)
(rec.sport.hockey,600)
(soc.religion.christian,599)(rec.motorcycles,598)(rec.sport.baseball,597)(sci.crypt,595)(rec.autos,594)(sci.med,594)(comp.windows.x,593)(sci.space,593)(sci.electronics,591)(comp.os.ms-windows.misc,591)(comp.sys.ibm.pc.hardware,590)(misc.forsale,585)(comp.graphics,584)(comp.sys.mac.hardware,578)(talk.politics.mideast,564)(talk.politics.guns,546)(alt.atheism,480)(talk.politics.misc,465)(talk.religion.misc,377)
2 建模
2.1 分词
val text = rdd.map { case (file, text) => text }
val whiteSpaceSplit = text.flatMap(t => t.split(" ").map(_.toLowerCase))
println(whiteSpaceSplit.distinct.count)
println(whiteSpaceSplit.sample(true, 0.3, 42).take(100).mkString(","))
402978from:,mathew,mathew,faq:,faq:,atheist,resources
summary:,music,--,fiction,,mantis,consultants,,uk.supersedes:,290
archive-name:,1.0
,,,,,,,,,,,,,,,,,,,organizations
,organizations
,,,,,,,,,,,,,,,,stickers,and,and,the,from,from,in,to:,to:,ffrf,,256-8900
evolution,designs
evolution,a,stick,cars,,writteninside.,fish,us.
write,evolution,,,,,,,bay,can,get,get,,to,theprice,is,of,the,the,so,on.,and,foote.,,atheist,pp.,0-910309-26-4,,,atrocities,,foote:,aap.,,the
2.2 改进分词
val nonWordSplit = text.flatMap(t => t.split("""\W+""").map(_.toLowerCase))
println(nonWordSplit.distinct.count)
println(nonWordSplit.distinct.sample(true, 0.3, 42).take(100).mkString(","))
val regex = """[^0-9]*""".r
val filterNumbers = nonWordSplit.filter(token => regex.pattern.matcher(token).matches)
println(filterNumbers.distinct.count)
println(filterNumbers.distinct.sample(true, 0.3, 42).take(100).mkString(","))
2.3 移除停用词
val tokenCounts = filterNumbers.map(t => (t, 1)).reduceByKey(_ + _)
val oreringDesc = Ordering.by[(String, Int), Int](_._2)
//println(tokenCounts.top(20)(oreringDesc).mkString("\n"))
val stopwords = Set(
"the","a","an","of","or","in","for","by","on","but", "is", "not", "with", "as", "was", "if",
"they", "are", "this", "and", "it", "have", "from", "at", "my", "be", "that", "to"
)
val tokenCountsFilteredStopwords = tokenCounts.filter { case (k, v) => !stopwords.contains(k) }
//println(tokenCountsFilteredStopwords.top(20)(oreringDesc).mkString("\n"))
val tokenCountsFilteredSize = tokenCountsFilteredStopwords.filter { case (k, v) => k.size >= 2 }
println(tokenCountsFilteredSize.top(20)(oreringDesc).mkString("\n"))
2.4 移除低频词
val oreringAsc = Ordering.by[(String, Int), Int](-_._2)
//println(tokenCountsFilteredSize.top(20)(oreringAsc).mkString("\n"))
val rareTokens = tokenCounts.filter{ case (k, v) => v < 2 }.map { case (k, v) => k }.collect.toSet
val tokenCountsFilteredAll = tokenCountsFilteredSize.filter { case (k, v) => !rareTokens.contains(k) }
println(tokenCountsFilteredAll.top(20)(oreringAsc).mkString("\n"))
def tokenize(line: String): Seq[String] = {
line.split("""\W+""")
.map(_.toLowerCase)
.filter(token => regex.pattern.matcher(token).matches)
.filterNot(token => stopwords.contains(token))
.filterNot(token => rareTokens.contains(token))
.filter(token => token.size >= 2)
.toSeq
}
//println(text.flatMap(doc => tokenize(doc)).distinct.count)
val tokens = text.map(doc => tokenize(doc))
println(tokens.first.take(20))
2.5 提取词干
标准NLP方法
搜索引擎
NLTKOpenNLPLucene
3 训练模型
3.1 HashingTF 特征哈希
import org.apache.spark.mllib.linalg.{ SparseVector => SV }
import org.apache.spark.mllib.feature.HashingTF
import org.apache.spark.mllib.feature.IDF
// set the dimensionality of TF-IDF vectors to 2^18
val dim = math.pow(2, 18).toInt
val hashingTF = new HashingTF(dim)
val tf = hashingTF.transform(tokens)
tf.cache
val v = tf.first.asInstanceOf[SV]
println(v.size)
println(v.values.size)
println(v.values.take(10).toSeq)
println(v.indices.take(10).toSeq)
262144
706WrappedArray(1.0, 1.0, 1.0, 1.0, 2.0, 1.0, 1.0, 2.0, 1.0, 1.0)WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)
fit & transform
val idf = new IDF().fit(tf)
val tfidf = idf.transform(tf)
val v2 = tfidf.first.asInstanceOf[SV]
println(v2.values.size)
println(v2.values.take(10).toSeq)
println(v2.indices.take(10).toSeq)
706WrappedArray(2.3869085659322193, 4.670445463955571, 6.561295835827856, 4.597686109673142, 8.932700215224111, 5.750365619611528, 2.1871123786150006, 5.520408782213984, 3.4312512246662714, 1.7430324343790569)WrappedArray(313, 713, 871, 1202, 1203, 1209, 1795, 1862, 3115, 3166)
3.2 分析权重
val minMaxVals = tfidf.map { v =>
val sv = v.asInstanceOf[SV]
(sv.values.min, sv.values.max)
}
val globalMinMax = minMaxVals.reduce { case ((min1, max1), (min2, max2)) =>
(math.min(min1, min2), math.max(max1, max2))
}
println(globalMinMax)
globalMinMax: (Double, Double) = (0.0,66155.39470409753)
常用词
val common = sc.parallelize(Seq(Seq("you", "do", "we")))
val tfCommon = hashingTF.transform(common)
val tfidfCommon = idf.transform(tfCommon)
val commonVector = tfidfCommon.first.asInstanceOf[SV]
println(commonVector.values.toSeq)
WrappedArray(0.9965359935704624, 1.3348773448236835, 0.5457486182039175)
不常出现的单词
val uncommon = sc.parallelize(Seq(Seq("telescope", "legislation", "investment")))
val tfUncommon = hashingTF.transform(uncommon)
val tfidfUncommon = idf.transform(tfUncommon)
val uncommonVector = tfidfUncommon.first.asInstanceOf[SV]
println(uncommonVector.values.toSeq)
WrappedArray(5.3265513728351666, 5.308532867332488, 5.483736956357579)
4 使用模型
4.1 余弦相似度
import breeze.linalg._
val hockeyText = rdd.filter { case (file, text) => file.contains("hockey") }
val hockeyTF = hockeyText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val hockeyTfIdf = idf.transform(hockeyTF.map(_._2))
val hockey1 = hockeyTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breeze1 = new SparseVector(hockey1.indices, hockey1.values, hockey1.size)
val hockey2 = hockeyTfIdf.sample(true, 0.1, 43).first.asInstanceOf[SV]
val breeze2 = new SparseVector(hockey2.indices, hockey2.values, hockey2.size)
val cosineSim = breeze1.dot(breeze2) / (norm(breeze1) * norm(breeze2))
println(cosineSim)
cosineSim: Double = 0.060250114361164626
val graphicsText = rdd.filter { case (file, text) => file.contains("comp.graphics") }
val graphicsTF = graphicsText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val graphicsTfIdf = idf.transform(graphicsTF.map(_._2))
val graphics = graphicsTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeGraphics = new SparseVector(graphics.indices, graphics.values, graphics.size)
val cosineSim2 = breeze1.dot(breezeGraphics) / (norm(breeze1) * norm(breezeGraphics))
println(cosineSim2)
cosineSim2: Double = 0.004664850323792852
val baseballText = rdd.filter { case (file, text) => file.contains("baseball") }
val baseballTF = baseballText.mapValues(doc => hashingTF.transform(tokenize(doc)))
val baseballTfIdf = idf.transform(baseballTF.map(_._2))
val baseball = baseballTfIdf.sample(true, 0.1, 42).first.asInstanceOf[SV]
val breezeBaseball = new SparseVector(baseball.indices, baseball.values, baseball.size)
val cosineSim3 = breeze1.dot(breezeBaseball) / (norm(breeze1) * norm(breezeBaseball))
println(cosineSim3)
0.05047395039466008
4.2 学习单词与主题的映射关系
多分类映射
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.classification.NaiveBayes
import org.apache.spark.mllib.evaluation.MulticlassMetrics
val newsgroupsMap = newsgroups.distinct.collect().zipWithIndex.toMap
val zipped = newsgroups.zip(tfidf)
val train = zipped.map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
train.cache
朴素贝叶斯训练
val model = NaiveBayes.train(train, lambda = 0.1)
加载测试数据集
val testPath = PATH+"/20news-bydate-test/*"
val testRDD = sc.wholeTextFiles(testPath)
val testLabels = testRDD.map { case (file, text) =>
val topic = file.split("/").takeRight(2).head
newsgroupsMap(topic)
}
val testTf = testRDD.map { case (file, text) => hashingTF.transform(tokenize(text)) }
val testTfIdf = idf.transform(testTf)
val zippedTest = testLabels.zip(testTfIdf)
val test = zippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
计算准确度和多分类加权F-指标
val predictionAndLabel = test.map(p => (model.predict(p.features), p.label))
val accuracy = 1.0 * predictionAndLabel.filter(x => x._1 == x._2).count() / test.count()
println(accuracy)
0.7915560276155071
val metrics = new MulticlassMetrics(predictionAndLabel)
println(metrics.weightedFMeasure)
0.7810675969031116
5 评估
val rawTokens = rdd.map { case (file, text) => text.split(" ") }
val rawTF = rawTokens.map(doc => hashingTF.transform(doc))
val rawTrain = newsgroups.zip(rawTF).map { case (topic, vector) => LabeledPoint(newsgroupsMap(topic), vector) }
val rawModel = NaiveBayes.train(rawTrain, lambda = 0.1)
val rawTestTF = testRDD.map { case (file, text) => hashingTF.transform(text.split(" ")) }
val rawZippedTest = testLabels.zip(rawTestTF)
val rawTest = rawZippedTest.map { case (topic, vector) => LabeledPoint(topic, vector) }
val rawPredictionAndLabel = rawTest.map(p => (rawModel.predict(p.features), p.label))
val rawAccuracy = 1.0 * rawPredictionAndLabel.filter(x => x._1 == x._2).count() / rawTest.count()
println(rawAccuracy)
0.7648698884758365
val rawMetrics = new MulticlassMetrics(rawPredictionAndLabel)
println(rawMetrics.weightedFMeasure)
0.7653320418573546
6 Word2Vec模型
Word2Vec模型(分布向量表示):把每个单词表示成一个向量,MLlib中使用skip-gram模型
6.1 训练
import org.apache.spark.mllib.feature.Word2Vec
val word2vec = new Word2Vec()
word2vec.setSeed(42) // we do this to generate the same results each time
val word2vecModel = word2vec.fit(tokens)
6.2 使用
最相似的20个单词
word2vecModel.findSynonyms("hockey", 20).foreach(println)
(sport,1.4818968962277133)
(ecac,1.467546566194254)(hispanic,1.4166835301985194)(glens,1.4061103042432825)(woofers,1.3810090447028116)(tournament,1.3148823031671586)(champs,1.3133863003013941)(boxscores,1.307735040384543)(aargh,1.274986851270267)(ahl,1.265165428167253)(playoff,1.2645991118770572)(ncaa,1.2383382015648046)(pool,1.2261154635870224)(champion,1.2119919989539134)(filinuk,1.2062208620660915)(olympic,1.2026738930160243)(motorcycles,1.2008032355579679)(yankees,1.1989755767973371)(calder,1.194001886835493)(homeruns,1.1800625883573932)
word2vecModel.findSynonyms("legislation", 20).foreach(println)
(accommodates,0.9918184454068688)
(briefed,0.9256758135452989)(amended,0.9105987267173344)(telephony,0.8679173760123956)(pitted,0.8609974033962533)(aclu,0.8605885863332372)(licensee,0.8493930472487975)(agency,0.836706135804648)(policies,0.8337986602365566)(senate,0.8327312936220903)(businesses,0.8291191155630467)(permit,0.8266658804181389)(cpsr,0.8231228090944367)(cooperation,0.8195562469006543)(surveillance,0.8134342524628756)(congress,0.8132899468772855)(restricted,0.8115013134507126)(procure,0.8096839595766356)(inquiry,0.8086297702914405)(industry,0.8077900093754752)
legislation 立法
aclu 美国公民自由协会senate 参议院surveillance 监视inquiry 调查