【Spark SQL】- 读取数据并进行一些简单的查询

    xiaoxiao2025-01-29  9

    1.测试数据

    文件名 : people.json 内容 : {"name":"Michael", "age":12} {"name":"Andy", "age":30} {"name":"Justin", "age":19} {"name":"kafak", "age":19}

    这里我是在IDEA本地运行的 代码 所以Master我设置的是 Local

    2 .代码

    创建SaprkSession SparkConf conf = new SparkConf() .setMaster("local") .setAppName("SqlDemo02"); JavaSparkContext sc = new JavaSparkContext(conf); // 设置日志输出等级 有一些启动信息的日志没必要看 sc.setLogLevel("WARN"); SparkSession spark = SparkSession .builder() .appName("Java Spark SQL basic example") .config("spark.some.config.option", "some-value") .getOrCreate(); /*注意上面的配置都是固定的 appname 可以改成其他的名字 */ DSL(spark); System.out.println("*************************************"); SQL(spark); // 最后不要忘了关闭! spark.stop(); sc.stop(); DSL private static void DSL(SparkSession spark) { /** * 1.读取文件,并将数据返回封装到一个Dataset的对象中 * 2.读取的数据类型 : json , csv , 数据库中的数据 , txt文本 .... * 3.数据都存放在resources目录下 * 4.如果是相对路径 json("people.json") 你需要把 */ Dataset<Row> df = spark.read().json("SparkSql/src/main/resources/people.json"); // 1.打印所有的数据 df.show(); /** * +---+-------+ * |age| name| * +---+-------+ * | 12|Michael| * | 30| Andy| * | 19| Justin| * | 19| kafak| * +---+-------+ */ // 2.打印数据的结构 df.printSchema(); /** * root * |-- age: long (nullable = true) * |-- name: string (nullable = true) */ // 3.查询 : 类似于 Select 列名1 , 列名2 from people /** * select("列名1" , "列名2") */ df.select("age").show(); /** * |age| * +---+ * | 12| * | 30| * | 19| * | 19| * +---+ */ /** * select(Column对象1 , Column对象2) * 这里注意 : Col 是一个方法 我在导入的时候用了静态导入 * 类似于 : import static org.apache.spark.sql.functions.col; * 因为 Col 是一个静态方法 , 如果不是静态导入的话使用时 是 类名.方法名 * 就像这样 functions.col * * Col(列名) -> 返回 Column 对象 * Column 对象中封装了很多对数据操作的一些方法 * plus(int n) 对指定列的数据值 加 n 如下图 * gt(int n) 返回值大于 n 的数据 * 等等 */ df.select(col("name"), col("age").plus(1)).show(); /** * +-------+---------+ * | name|(age + 1)| * +-------+---------+ * |Michael| 13| * | Andy| 31| * | Justin| 20| * | kafak| 20| * +-------+---------+ */ // 4. 过滤查询 /** * -> 显示年龄大于21岁的人的信息 * -> 如果要显示年龄小于21的可以使用 col("age").lt(21) * 当然, 还有另外一种写法 : filter("age > 21") * 这里特别注意 : age 和 name 都是有数据类型的 age 是int, name是String * 写 "age > 21" 要求必须写正确 , 你不能写个 "name > 21" 这样会报错 * 或者写个不存在的字段 , 也会报错 */ df.filter(col("age").gt(21)).show(); /** * +---+----+ * |age|name| * +---+----+ * | 30|Andy| * +---+----+ */ // 5.分组查询 /** * 注意 : groupBy("age").show() 这样是不行的, 因为没有意义 这样的结果等同于 select("age").show() * -> 分组排序之后可以做些操作 , 比如你可以调用Count 统计每一组的值 * -> 或者调用 其他方法 比如 max() 获取分组中每一组的最大值 * -> df.groupBy(col("age")).max().show(); */ df.groupBy(col("age")).count().show(); /** * +---+-----+ * |age|count| * +---+-----+ * | 19| 2| * | 12| 1| * | 30| 1| * +---+-----+ */ } SQL // 使用SQL语句进行查询 private static void SQL(SparkSession spark) throws AnalysisException { // 读取数据 Dataset<Row> df = spark.read().json("SparkSql/src/main/resources/people.json"); // 1.创建一个临时表 /** * 注意 ! : 必须创建临时表才能进行查询 * createOrReplaceTempView : * -> 创建一个临时表 如果这个表已经存在 , 那么就替换这个表 生命周期是 SparkSession 存在 即不执行SaprkSession.stop * 一旦这SparkSession结束 , 那么这个表就不存在了 , 如果想要在不同的SparkSesion 共享同一张表 可以用createGlobalTempView * createGlobalTempView : 创建一个全局表 , 只要Spark应用不关闭 这个表就是一直存在的 , 可以在不同的SparkSession共享 */ // 1.创建一个临时表 df.createOrReplaceTempView("people"); // 2.查询所有的数据 Dataset<Row> sqlDF_01 = spark.sql("SELECT * FROM people"); sqlDF_01.show(); // 3.创建一个全局的临时表 df.createGlobalTempView("people"); // 这里要注意 : 如果使用全局的临时表,那么 查询的时候必须使用 global_temp.表名 如下 // 官网文档的解释是 全局的临时表 是存储在一个系统数据库 global_temp 上的 . 所以调用需要指定数据名 Dataset<Row> sqlDF_02 = spark.sql("SELECT * FROM global_temp.people"); sqlDF_02.show(); // 1.创建一个新的会话 /** * 创建一个子会话 , 这个会话在SparkContext和缓存(?) 上和 创建它的会话是相同的 , 但是配置 , 和临时表是不共享的 * 换句话说 , 相当于创建了一个新的SparkSession , 只有SparkContext还是原先的 */ SparkSession sparkSession = spark.newSession(); // 执行查询语句 Dataset<Row> sqlDF_03 = spark.sql("SELECT * FROM global_temp.people"); Dataset<Row> sqlDF_04 = spark.sql("SELECT * FROM people"); sqlDF_03.show(); sqlDF_04.show(); }
    最新回复(0)