Spark机器学习1·编程入门(scalajavapython)

    xiaoxiao2026-04-12  9

    Spark安装目录

    /Users/erichan/Garden/spark-1.4.0-bin-hadoop2.6 基本测试 ./bin/run-example org.apache.spark.examples.SparkPi MASTER=local[20] ./bin/run-example org.apache.spark.examples.SparkPi

    scala

    import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ /** * A simple Spark app in Scala */ object ScalaApp { def main(args: Array[String]) { val sc = new SparkContext("local[2]", "First Spark App") val data = sc.textFile("data/UserPurchaseHistory.csv") .map(line => line.split(",")) .map(purchaseRecord => (purchaseRecord(0), purchaseRecord(1), purchaseRecord(2))) val numPurchases = data.count() val uniqueUsers = data.map { case (user, product, price) => user }.distinct().count() val totalRevenue = data.map { case (user, product, price) => price.toDouble }.sum() val productsByPopularity = data .map { case (user, product, price) => (product, 1) } .reduceByKey(_ + _) .collect() .sortBy(-_._2) val mostPopular = productsByPopularity(0) println("Total purchases: " + numPurchases) println("Unique users: " + uniqueUsers) println("Total revenue: " + totalRevenue) println("Most popular product: %s with %d purchases".format(mostPopular._1, mostPopular._2)) sc.stop() } }

    build.sbt

    name := "scala-spark-app" version := "1.0" scalaVersion := "2.11.6" libraryDependencies += "org.apache.spark" %% "spark-core" % "1.4.0" erichan:scala-spark-app/ $ sbt run

    java 8

    import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; import java.util.List; public class JavaApp { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext("local[2]", "First Spark App"); JavaRDD<String[]> data = sc.textFile("data/UserPurchaseHistory.csv").map(s -> s.split(",")); long numPurchases = data.count(); long uniqueUsers = data.map(strings -> strings[0]).distinct().count(); double totalRevenue = data.mapToDouble(strings -> Double.parseDouble(strings[2])).sum(); List<Tuple2<String, Integer>> pairs = data.mapToPair( new PairFunction<String[], String, Integer>() { @Override public Tuple2<String, Integer> call(String[] strings) throws Exception { return new Tuple2(strings[1], 1); } } ).reduceByKey((i1, i2) -> i1 + i2).collect(); pairs.sort((o1, o2) -> -(o1._2() - o2._2())); String mostPopular = pairs.get(0)._1(); int purchases = pairs.get(0)._2(); System.out.println("Total purchases: " + numPurchases); System.out.println("Unique users: " + uniqueUsers); System.out.println("Total revenue: " + totalRevenue); System.out.println(String.format("Most popular product: %s with %d purchases", mostPopular, purchases)); sc.stop(); } }

    Maven pom.xml

    <?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>java-spark-app</groupId> <artifactId>java-spark-app</artifactId> <version>1.0</version> <dependencies> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.11</artifactId> <version>1.4.0</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <version>3.1</version> <configuration> <source>1.8</source> <target>1.8</target> </configuration> </plugin> </plugins> </build> </project>

    python

    from pyspark import SparkContext sc = SparkContext("local[2]", "First Spark App") data = sc.textFile("data/UserPurchaseHistory.csv").map(lambda line: line.split(",")).map(lambda record: (record[0], record[1], record[2])) numPurchases = data.count() uniqueUsers = data.map(lambda record: record[0]).distinct().count() totalRevenue = data.map(lambda record: float(record[2])).sum() products = data.map(lambda record: (record[1], 1.0)).reduceByKey(lambda a, b: a + b).collect() mostPopular = sorted(products, key=lambda x: x[1], reverse=True)[0] print "Total purchases: %d" % numPurchases print "Unique users: %d" % uniqueUsers print "Total revenue: %2.2f" % totalRevenue print "Most popular product: %s with %d purchases" % (mostPopular[0], mostPopular[1]) sc.stop() cd /Users/erichan/Garden/spark-1.4.0-bin-hadoop2.6/bin ./spark-submit pythonapp.py
    最新回复(0)