Spark高级数据分析· 6LSA

    xiaoxiao2025-11-17  27

    潜在语义分析

    wget http://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream.xml.bz2

    1 获取数据

    def readFile(path: String, sc: SparkContext): RDD[String] = { val conf = new Configuration() conf.set(XmlInputFormat.START_TAG_KEY, "<page>") conf.set(XmlInputFormat.END_TAG_KEY, "</page>") val rawXmls = sc.newAPIHadoopFile(path, classOf[XmlInputFormat], classOf[LongWritable], classOf[Text], conf) rawXmls.map(p => p._2.toString) } //Returns a (title, content) pair def wikiXmlToPlainText(pageXml: String): Option[(String, String)] = { val page = new EnglishWikipediaPage() WikipediaPage.readPage(page, pageXml) if (page.isEmpty || !page.isArticle || page.isRedirect || page.getTitle.contains("(disambiguation)")) { None } else { Some((page.getTitle, page.getContent)) } } val pages = readFile("hdfs:///user/ds/Wikipedia/", sc) .sample(false, sampleSize, 11L) val plainText = pages.filter(_ != null).flatMap(wikiXmlToPlainText)

    2 词形归并

    def createNLPPipeline(): StanfordCoreNLP = { val props = new Properties() props.put("annotators", "tokenize, ssplit, pos, lemma") new StanfordCoreNLP(props) } def isOnlyLetters(str: String): Boolean = { // While loop for high performance var i = 0 while (i < str.length) { if (!Character.isLetter(str.charAt(i))) { return false } i += 1 } true } def plainTextToLemmas(text: String, stopWords: Set[String], pipeline: StanfordCoreNLP) : Seq[String] = { val doc = new Annotation(text) pipeline.annotate(doc) val lemmas = new ArrayBuffer[String]() val sentences = doc.get(classOf[SentencesAnnotation]) for (sentence <- sentences.asScala; token <- sentence.get(classOf[TokensAnnotation]).asScala) { val lemma = token.get(classOf[LemmaAnnotation]) if (lemma.length > 2 && !stopWords.contains(lemma) && isOnlyLetters(lemma)) { lemmas += lemma.toLowerCase } } lemmas } val stopWords = sc.broadcast(loadStopWords("stopwords.txt")).value val lemmatized = plainText.mapPartitions(iter => { val pipeline = createNLPPipeline() iter.map{ case(title, contents) => (title, plainTextToLemmas(contents, stopWords, pipeline))} })

    3 TF-IDF

    def documentTermMatrix(docs: RDD[(String, Seq[String])], stopWords: Set[String], numTerms: Int, sc: SparkContext): (RDD[Vector], Map[Int, String], Map[Long, String], Map[String, Double]) = { val docTermFreqs = docs.mapValues(terms => { val termFreqsInDoc = terms.foldLeft(new HashMap[String, Int]()) { (map, term) => map += term -> (map.getOrElse(term, 0) + 1) } termFreqsInDoc }) docTermFreqs.cache() val docIds = docTermFreqs.map(_._1).zipWithUniqueId().map(_.swap).collectAsMap() val docFreqs = documentFrequenciesDistributed(docTermFreqs.map(_._2), numTerms) println("Number of terms: " + docFreqs.size) saveDocFreqs("docfreqs.tsv", docFreqs) val numDocs = docIds.size val idfs = inverseDocumentFrequencies(docFreqs, numDocs) // Maps terms to their indices in the vector val idTerms = idfs.keys.zipWithIndex.toMap val termIds = idTerms.map(_.swap) val bIdfs = sc.broadcast(idfs).value val bIdTerms = sc.broadcast(idTerms).value val vecs = docTermFreqs.map(_._2).map(termFreqs => { val docTotalTerms = termFreqs.values.sum val termScores = termFreqs.filter { case (term, freq) => bIdTerms.contains(term) }.map{ case (term, freq) => (bIdTerms(term), bIdfs(term) * termFreqs(term) / docTotalTerms) }.toSeq Vectors.sparse(bIdTerms.size, termScores) }) (vecs, termIds, docIds, idfs) } def documentFrequencies(docTermFreqs: RDD[HashMap[String, Int]]): HashMap[String, Int] = { val zero = new HashMap[String, Int]() def merge(dfs: HashMap[String, Int], tfs: HashMap[String, Int]) : HashMap[String, Int] = { tfs.keySet.foreach { term => dfs += term -> (dfs.getOrElse(term, 0) + 1) } dfs } def comb(dfs1: HashMap[String, Int], dfs2: HashMap[String, Int]) : HashMap[String, Int] = { for ((term, count) <- dfs2) { dfs1 += term -> (dfs1.getOrElse(term, 0) + count) } dfs1 } docTermFreqs.aggregate(zero)(merge, comb) } def documentFrequenciesDistributed(docTermFreqs: RDD[HashMap[String, Int]], numTerms: Int) : Array[(String, Int)] = { val docFreqs = docTermFreqs.flatMap(_.keySet).map((_, 1)).reduceByKey(_ + _, 15) val ordering = Ordering.by[(String, Int), Int](_._2) docFreqs.top(numTerms)(ordering) } def trimLeastFrequent(freqs: Map[String, Int], numToKeep: Int): Map[String, Int] = { freqs.toArray.sortBy(_._2).take(math.min(numToKeep, freqs.size)).toMap } def inverseDocumentFrequencies(docFreqs: Array[(String, Int)], numDocs: Int) : Map[String, Double] = { docFreqs.map{ case (term, count) => (term, math.log(numDocs.toDouble / count))}.toMap }

    4 奇异值分解

    termDocMatrix.cache() val mat = new RowMatrix(termDocMatrix) val svd = mat.computeSVD(k, computeU=true) def topTermsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int, numTerms: Int, termIds: Map[Int, String]): Seq[Seq[(String, Double)]] = { val v = svd.V val topTerms = new ArrayBuffer[Seq[(String, Double)]]() val arr = v.toArray for (i <- 0 until numConcepts) { val offs = i * v.numRows val termWeights = arr.slice(offs, offs + v.numRows).zipWithIndex val sorted = termWeights.sortBy(-_._1) topTerms += sorted.take(numTerms).map{case (score, id) => (termIds(id), score)} } topTerms } def topDocsInTopConcepts(svd: SingularValueDecomposition[RowMatrix, Matrix], numConcepts: Int, numDocs: Int, docIds: Map[Long, String]): Seq[Seq[(String, Double)]] = { val u = svd.U val topDocs = new ArrayBuffer[Seq[(String, Double)]]() for (i <- 0 until numConcepts) { val docWeights = u.rows.map(_.toArray(i)).zipWithUniqueId topDocs += docWeights.top(numDocs).map{case (score, id) => (docIds(id), score)} } topDocs } val topConceptTerms = topTermsInTopConcepts(svd, 10, 10, termIds) val topConceptDocs = topDocsInTopConcepts(svd, 10, 10, docIds) for ((terms, docs) <- topConceptTerms.zip(topConceptDocs)) { println("Concept terms: " + terms.map(_._1).mkString(", ")) println("Concept docs: " + docs.map(_._1).mkString(", ")) println() }

    5 相关度

    import breeze.linalg.{DenseMatrix => BDenseMatrix, DenseVector => BDenseVector, SparseVector => BSparseVector} def topTermsForTerm(normalizedVS: BDenseMatrix[Double], termId: Int): Seq[(Double, Int)] = { // Look up the row in VS corresponding to the given term ID. val termRowVec = new BDenseVector[Double](row(normalizedVS, termId).toArray) // Compute scores against every term val termScores = (normalizedVS * termRowVec).toArray.zipWithIndex // Find the terms with the highest scores termScores.sortBy(-_._1).take(10) } def topDocsForDoc(normalizedUS: RowMatrix, docId: Long): Seq[(Double, Long)] = { // Look up the row in US corresponding to the given doc ID. val docRowArr = row(normalizedUS, docId) val docRowVec = Matrices.dense(docRowArr.length, 1, docRowArr) // Compute scores against every doc val docScores = normalizedUS.multiply(docRowVec) // Find the docs with the highest scores val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId // Docs can end up with NaN score if their row in U is all zeros. Filter these out. allDocWeights.filter(!_._1.isNaN).top(10) } def topDocsForTerm(US: RowMatrix, V: Matrix, termId: Int): Seq[(Double, Long)] = { val termRowArr = row(V, termId).toArray val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr) // Compute scores against every doc val docScores = US.multiply(termRowVec) // Find the docs with the highest scores val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId allDocWeights.top(10) }

    多词项查询

    def termsToQueryVector(terms: Seq[String], idTerms: Map[String, Int], idfs: Map[String, Double]) : BSparseVector[Double] = { val indices = terms.map(idTerms(_)).toArray val values = terms.map(idfs(_)).toArray new BSparseVector[Double](indices, values, idTerms.size) } def topDocsForTermQuery(US: RowMatrix, V: Matrix, query: BSparseVector[Double]) : Seq[(Double, Long)] = { val breezeV = new BDenseMatrix[Double](V.numRows, V.numCols, V.toArray) val termRowArr = (breezeV.t * query).toArray val termRowVec = Matrices.dense(termRowArr.length, 1, termRowArr) // Compute scores against every doc val docScores = US.multiply(termRowVec) // Find the docs with the highest scores val allDocWeights = docScores.rows.map(_.toArray(0)).zipWithUniqueId allDocWeights.top(10) } 相关资源:Spark高级数据分析(中文完整pdf版)
    最新回复(0)