Spark高级数据分析· 3推荐引擎

    xiaoxiao2025-12-04  9

    推荐算法流程

    推荐算法

    预备

    wget http://www.iro.umontreal.ca/~lisa/datasets/profiledata_06-May-2005.tar.gz cd /Users/erichan/garden/spark-1.6.0-bin-hadoop2.6/bin ./spark-shell --master local --driver-memory 6g

    1 准备数据

    val data ="/Users/erichan/AliDrive/ml_spark/data/profiledata_06-May-2005" val rawUserArtistData = sc.textFile(data+"/user_artist_data.txt",10) // ALS 需要ID必须为数值型 rawUserArtistData.first //res3: String = 1092764 1000311 //rawUserArtistData.map(_.split(' ')(0).toDouble).stats() //res10: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1947573.265353, stdev: 496000.544975, max: 2443548.000000, min: 90.000000) //rawUserArtistData.map(_.split(' ')(1).toDouble).stats() //res11: org.apache.spark.util.StatCounter = (count: 24296858, mean: 1718704.093757, stdev: 2539389.040171, max: 10794401.000000, min: 1.000000) val rawArtistData = sc.textFile(data+"/artist_data.txt") //rawArtistData.first //res12: String = 1134999 06Crazy Life val artistByID = rawArtistData.flatMap { line => val (id, name) = line.span(_ != '\t') if (name.isEmpty) { None }else{ try { Some((id.toInt, name.trim)) } catch { case e: NumberFormatException => None } } } val rawArtistAlias = sc.textFile(data+"/artist_alias.txt") val artistAlias = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None }else{ Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() //artistByID.lookup(1000010).head //res14: String = Aerosmith

    2 建模

    import org.apache.spark.mllib.recommendation._ val bArtistAlias = sc.broadcast(artistAlias) val trainData = rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) }.cache() val model = ALS.trainImplicit(trainData, 10, 5, 0.01, 1.0)

    3 检验

    val rawArtistsForUser = rawUserArtistData.map(_.split(' ')).filter { case Array(user,_,_) => user.toInt == 2093760 } val existingProducts = rawArtistsForUser.map { case Array(_,artist,_) => artist.toInt }.collect().toSet artistByID.filter { case (id, name) => existingProducts.contains(id) }.values.collect().foreach(println) val recommendations = model.recommendProducts(2093760, 5) recommendations.foreach(println) val recommendedProductIDs = recommendations.map(_.product).toSet artistByID.filter { case (id, name) => recommendedProductIDs.contains(id) }.values.collect().foreach(println)

    4 评价

    :load /Users/erichan/sourcecode/book/aas/ch03-recommender/src/main/scala/RunAUC.scala val bArtistAlias = sc.broadcast(RunAUC.buildArtistAlias(rawArtistAlias)) val allData = RunAUC.buildRatings(rawUserArtistData, bArtistAlias) val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1)) trainData.cache() cvData.cache() val allItemIDs = allData.map(_.product).distinct().collect() val bAllItemIDs = sc.broadcast(allItemIDs) val mostListenedAUC = RunAUC.areaUnderCurve(cvData, bAllItemIDs, RunAUC.predictMostListened(sc, trainData)) println(mostListenedAUC) //0.9395286660878177 trainData.unpersist() cvData.unpersist()

    5 推荐

    val someUsers = allData.map(_.user).distinct().take(100) val someRecommendations = someUsers.map(userID => model.recommendProducts(userID, 5)) someRecommendations.map( recs => recs.head.user + " -> " + recs.map(_.product).mkString(", ") ).foreach(println)

    附录

    RunAUC.scala

    import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ import org.apache.spark.broadcast.Broadcast import org.apache.spark.mllib.recommendation._ import org.apache.spark.rdd.RDD import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.util.Random /** * Created by erichan * on 16/1/26. */ object RunAUC { def areaUnderCurve( positiveData: RDD[Rating], bAllItemIDs: Broadcast[Array[Int]], predictFunction: (RDD[(Int,Int)] => RDD[Rating])) = { // What this actually computes is AUC, per user. The result is actually something // that might be called "mean AUC". // Take held-out data as the "positive", and map to tuples val positiveUserProducts = positiveData.map(r => (r.user, r.product)) // Make predictions for each of them, including a numeric score, and gather by user val positivePredictions = predictFunction(positiveUserProducts).groupBy(_.user) // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of // small AUC problems, and it would be inefficient, when a direct computation is available. // Create a set of "negative" products for each user. These are randomly chosen // from among all of the other items, excluding those that are "positive" for the user. val negativeUserProducts = positiveUserProducts.groupByKey().mapPartitions { // mapPartitions operates on many (user,positive-items) pairs at once userIDAndPosItemIDs => { // Init an RNG and the item IDs set once for partition val random = new Random() val allItemIDs = bAllItemIDs.value userIDAndPosItemIDs.map { case (userID, posItemIDs) => val posItemIDSet = posItemIDs.toSet val negative = new ArrayBuffer[Int]() var i = 0 // Keep about as many negative examples per user as positive. // Duplicates are OK while (i < allItemIDs.size && negative.size < posItemIDSet.size) { val itemID = allItemIDs(random.nextInt(allItemIDs.size)) if (!posItemIDSet.contains(itemID)) { negative += itemID } i += 1 } // Result is a collection of (user,negative-item) tuples negative.map(itemID => (userID, itemID)) } } }.flatMap(t => t) // flatMap breaks the collections above down into one big set of tuples // Make predictions on the rest: val negativePredictions = predictFunction(negativeUserProducts).groupBy(_.user) // Join positive and negative by user positivePredictions.join(negativePredictions).values.map { case (positiveRatings, negativeRatings) => // AUC may be viewed as the probability that a random positive item scores // higher than a random negative one. Here the proportion of all positive-negative // pairs that are correctly ranked is computed. The result is equal to the AUC metric. var correct = 0L var total = 0L // For each pairing, for (positive <- positiveRatings; negative <- negativeRatings) { // Count the correctly-ranked pairs if (positive.rating > negative.rating) { correct += 1 } total += 1 } // Return AUC: fraction of pairs ranked correctly correct.toDouble / total }.mean() // Return mean AUC over users } def predictMostListened(sc: SparkContext, train: RDD[Rating])(allData: RDD[(Int,Int)]) = { val bListenCount = sc.broadcast(train.map(r => (r.product, r.rating)).reduceByKey(_ + _).collectAsMap()) allData.map { case (user, product) => Rating(user, product, bListenCount.value.getOrElse(product, 0.0)) } } def buildArtistAlias(rawArtistAlias: RDD[String]): Map[Int,Int] = rawArtistAlias.flatMap { line => val tokens = line.split('\t') if (tokens(0).isEmpty) { None } else { Some((tokens(0).toInt, tokens(1).toInt)) } }.collectAsMap() def buildRatings( rawUserArtistData: RDD[String], bArtistAlias: Broadcast[Map[Int,Int]]) = { rawUserArtistData.map { line => val Array(userID, artistID, count) = line.split(' ').map(_.toInt) val finalArtistID = bArtistAlias.value.getOrElse(artistID, artistID) Rating(userID, finalArtistID, count) } } } 相关资源:python入门教程(PDF版)
    最新回复(0)