1.测试数据
文件名 : people.json
内容 :
{"name":"Michael", "age":12}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
{"name":"kafak", "age":19}
这里我是在IDEA本地运行的 代码 所以Master我设置的是 Local
2 .代码
创建SaprkSession
SparkConf conf = new SparkConf()
.setMaster("local")
.setAppName("SqlDemo02");
JavaSparkContext sc = new JavaSparkContext(conf);
// 设置日志输出等级 有一些启动信息的日志没必要看
sc.setLogLevel("WARN");
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
/*注意上面的配置都是固定的 appname 可以改成其他的名字 */
DSL(spark);
System.out.println("*************************************");
SQL(spark);
// 最后不要忘了关闭!
spark.stop();
sc.stop();
DSL
private static void DSL(SparkSession spark) {
/**
* 1.读取文件,并将数据返回封装到一个Dataset的对象中
* 2.读取的数据类型 : json , csv , 数据库中的数据 , txt文本 ....
* 3.数据都存放在resources目录下
* 4.如果是相对路径 json("people.json") 你需要把
*/
Dataset<Row> df = spark.read().json("SparkSql/src/main/resources/people.json");
// 1.打印所有的数据
df.show();
/**
* +---+-------+
* |age| name|
* +---+-------+
* | 12|Michael|
* | 30| Andy|
* | 19| Justin|
* | 19| kafak|
* +---+-------+
*/
// 2.打印数据的结构
df.printSchema();
/**
* root
* |-- age: long (nullable = true)
* |-- name: string (nullable = true)
*/
// 3.查询 : 类似于 Select 列名1 , 列名2 from people
/**
* select("列名1" , "列名2")
*/
df.select("age").show();
/**
* |age|
* +---+
* | 12|
* | 30|
* | 19|
* | 19|
* +---+
*/
/**
* select(Column对象1 , Column对象2)
* 这里注意 : Col 是一个方法 我在导入的时候用了静态导入
* 类似于 : import static org.apache.spark.sql.functions.col;
* 因为 Col 是一个静态方法 , 如果不是静态导入的话使用时 是 类名.方法名
* 就像这样 functions.col
*
* Col(列名) -> 返回 Column 对象
* Column 对象中封装了很多对数据操作的一些方法
* plus(int n) 对指定列的数据值 加 n 如下图
* gt(int n) 返回值大于 n 的数据
* 等等
*/
df.select(col("name"), col("age").plus(1)).show();
/**
* +-------+---------+
* | name|(age + 1)|
* +-------+---------+
* |Michael| 13|
* | Andy| 31|
* | Justin| 20|
* | kafak| 20|
* +-------+---------+
*/
// 4. 过滤查询
/**
* -> 显示年龄大于21岁的人的信息
* -> 如果要显示年龄小于21的可以使用 col("age").lt(21)
* 当然, 还有另外一种写法 : filter("age > 21")
* 这里特别注意 : age 和 name 都是有数据类型的 age 是int, name是String
* 写 "age > 21" 要求必须写正确 , 你不能写个 "name > 21" 这样会报错
* 或者写个不存在的字段 , 也会报错
*/
df.filter(col("age").gt(21)).show();
/**
* +---+----+
* |age|name|
* +---+----+
* | 30|Andy|
* +---+----+
*/
// 5.分组查询
/**
* 注意 : groupBy("age").show() 这样是不行的, 因为没有意义 这样的结果等同于 select("age").show()
* -> 分组排序之后可以做些操作 , 比如你可以调用Count 统计每一组的值
* -> 或者调用 其他方法 比如 max() 获取分组中每一组的最大值
* -> df.groupBy(col("age")).max().show();
*/
df.groupBy(col("age")).count().show();
/**
* +---+-----+
* |age|count|
* +---+-----+
* | 19| 2|
* | 12| 1|
* | 30| 1|
* +---+-----+
*/
}
SQL
// 使用SQL语句进行查询
private static void SQL(SparkSession spark) throws AnalysisException {
// 读取数据
Dataset<Row> df = spark.read().json("SparkSql/src/main/resources/people.json");
// 1.创建一个临时表
/**
* 注意 ! : 必须创建临时表才能进行查询
* createOrReplaceTempView :
* -> 创建一个临时表 如果这个表已经存在 , 那么就替换这个表 生命周期是 SparkSession 存在 即不执行SaprkSession.stop
* 一旦这SparkSession结束 , 那么这个表就不存在了 , 如果想要在不同的SparkSesion 共享同一张表 可以用createGlobalTempView
* createGlobalTempView : 创建一个全局表 , 只要Spark应用不关闭 这个表就是一直存在的 , 可以在不同的SparkSession共享
*/
// 1.创建一个临时表
df.createOrReplaceTempView("people");
// 2.查询所有的数据
Dataset<Row> sqlDF_01 = spark.sql("SELECT * FROM people");
sqlDF_01.show();
// 3.创建一个全局的临时表
df.createGlobalTempView("people");
// 这里要注意 : 如果使用全局的临时表,那么 查询的时候必须使用 global_temp.表名 如下
// 官网文档的解释是 全局的临时表 是存储在一个系统数据库 global_temp 上的 . 所以调用需要指定数据名
Dataset<Row> sqlDF_02 = spark.sql("SELECT * FROM global_temp.people");
sqlDF_02.show();
// 1.创建一个新的会话
/**
* 创建一个子会话 , 这个会话在SparkContext和缓存(?) 上和 创建它的会话是相同的 , 但是配置 , 和临时表是不共享的
* 换句话说 , 相当于创建了一个新的SparkSession , 只有SparkContext还是原先的
*/
SparkSession sparkSession = spark.newSession();
// 执行查询语句
Dataset<Row> sqlDF_03 = spark.sql("SELECT * FROM global_temp.people");
Dataset<Row> sqlDF_04 = spark.sql("SELECT * FROM people");
sqlDF_03.show();
sqlDF_04.show();
}