Spark机器学习
res1: String = 196 242 3 881250949
import org.apache.spark.mllib.recommendation.ALS import org.apache.spark.mllib.recommendation.Rating val rawRatings=rawData.map(_.split("\t").take(3)) val ratings = rawRatings.map { case Array(user, movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble) } ratings.first()res2: org.apache.spark.mllib.recommendation.Rating = Rating(196,242,3.0)
res3: Long = 943
model.productFeatures.countres4: Long = 1682
Rating(789,176,5.732688958436494)Rating(789,201,5.682340265545152)Rating(789,182,5.5902224300291214)Rating(789,183,5.5877871075408585)Rating(789,96,5.4425266495153455)Rating(789,76,5.39730369058763)Rating(789,195,5.356822356978749)Rating(789,589,5.1464233861748925)Rating(789,134,5.109287533257644)Rating(789,518,5.106161562126567)
33
moviesForUser.sortBy(-_.rating).take(10).map(rating => (titles(rating.product), rating.rating)).foreach(println)(Godfather, The (1972),5.0)(Trainspotting (1996),5.0)(Dead Man Walking (1995),5.0)(Star Wars (1977),5.0)(Swingers (1996),5.0)(Leaving Las Vegas (1995),5.0)(Bound (1996),5.0)(Fargo (1996),5.0)(Last Supper, The (1995),5.0)(Private Parts (1997),4.0)
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)(Aliens (1986),5.732688958436494)(Evil Dead II (1987),5.682340265545152)(GoodFellas (1990),5.5902224300291214)(Alien (1979),5.5877871075408585)(Terminator 2: Judgment Day (1991),5.4425266495153455)(Carlito's Way (1993),5.39730369058763)(Terminator, The (1984),5.356822356978749)(Wild Bunch, The (1969),5.1464233861748925)(Citizen Kane (1941),5.109287533257644)(Miller's Crossing (1990),5.106161562126567)
res10: Double = 1.0
val sims = model.productFeatures.map{ case (id, factor) => val factorVector = new DoubleMatrix(factor) val sim = cosineSimilarity(factorVector, itemVector) (id, sim) } val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity }) println(sortedSims.mkString("\n"))(567,1.0)(413,0.7309050775072655)(895,0.6992030886048359)(853,0.6960095521899471)(219,0.6806270119940826)(302,0.6757242121714326)(257,0.6721490667554395)(160,0.6672080746572076)(563,0.6621573120106216)(1019,0.6591520069387037)
Wes Craven's New Nightmare (1994)
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity }) sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n")res13: String =(Tales from the Crypt Presents: Bordello of Blood (1996),0.7309050775072655)(Scream 2 (1997),0.6992030886048359)(Braindead (1992),0.6960095521899471)(Nightmare on Elm Street, A (1984),0.6806270119940826)(L.A. Confidential (1997),0.6757242121714326)(Men in Black (1997),0.6721490667554395)(Glengarry Glen Ross (1992),0.6672080746572076)(Stephen King's The Langoliers (1995),0.6621573120106216)(Die xue shuang xiong (Killer, The) (1989),0.6591520069387037)(Evil Dead II (1987),0.655134288821937)
Mean Squared Error = 0.08527363423596633
val RMSE = math.sqrt(MSE) println("Root Mean Squared Error = " + RMSE)Root Mean Squared Error = 0.2920164965134099
(1682,50)
val imBroadcast = sc.broadcast(itemMatrix) val allRecs = model.userFeatures.map{ case (userId, array) => val userVector = new DoubleMatrix(array) val scores = imBroadcast.value.mmul(userVector) val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1) val recommendedIds = sortedWithId.map(_._2 + 1).toSeq (userId, recommendedIds) } val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1) val K = 10 val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2).toSeq avgPrecisionK(actual, predicted, K) }.reduce(_ + _) / allRecs.count println("Mean Average Precision at K = " + MAPK)Mean Average Precision at K = 0.030001472840815356
Mean Squared Error = 0.08527363423596633
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)Root Mean Squared Error = 0.2920164965134099
Mean Average Precision = 0.07208991526855565
val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => val actual = actualWithIds.map(_._2).toSeq avgPrecisionK(actual, predicted, 2000) }.reduce(_ + _) / allRecs.count println("Mean Average Precision = " + MAPK2000)Mean Average Precision = 0.07208991526855561
