概述:Spark postgresql jdbc 数据库连接和写入操作源码解读,详细记录了SparkSQL对数据库的操作,通过java程序,在本地开发和运行。整体为,Spark建立数据库连接,读取数据,将DataFrame数据写入另一个数据库表中。附带完整项目源码(完整项目源码github)。
1.首先在postgreSQL中创建一张测试表,并插入数据。(完整项目源码Github)
1.1. 在postgreSQL中的postgres用户下,创建 products
CREATE TABLE products (
product_no integer,
name text,
price numeric
);
1.2. 在 products 插入数据
INSERT INTO products (product_no, name, price) VALUES
(1, 'Cheese', 9.99),
(2, 'Bread', 1.99),
(3, 'Milk', 2.99);
查看数据库写入结果。
2.编写SPARK程序。(完整项目源码Github)
2.1.读取Postgresql某一张表的数据为DataFrame(完整项目源码Github)
SparkPostgresqlJdbc
.java
Properties connectionProperties = new Properties()
//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
connectionProperties
.put(
"user",
"postgres")
connectionProperties
.put(
"password",
"123456")
connectionProperties
.put(
"driver",
"org.postgresql.Driver")
//SparkJdbc读取Postgresql的products表内容
Dataset<Row> jdbcDF = spark
.read()
.jdbc(
"jdbc:postgresql://localhost:5432/postgres",
"products",connectionProperties)
.select(
"name",
"price")
//显示jdbcDF数据内容
jdbcDF
.show()
2.2.写入Postgresql某张表中
//将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是
"error"模式。
jdbcDF
.write()
.mode(
"append")
.jdbc(
"jdbc:postgresql://localhost:5432/postgres",
"newproducts",connectionProperties)
3.运行程序,并查看结果(如果在IDEA中开发不熟练,可以看我另一篇博文spark (java API) 在Intellij IDEA中开发并运行)。
3.1.直接在intellij IDEA(社区版)中运行。
a.在运行按钮的“Edit Configeration”中的VM option中添加“-Dspark.master=local”
3.2.在终端(Terminal)中运行。
-
-class "SparkPostgresqlJdbc" \
-
-master local[4] \
-
-driver-class-path /home/xiaolei/.m2/repository/org/postgresql/postgresql/9.4.1212/postgresql-9.4.1212.jar \
target/SparkPostgresqlJdbc-1.0-SNAPSHOT.jar
其中 --driver-class-path 指定下载的postgresql JDBC数据 库驱动路径,命令执行要在项目的根目录中(/home/xiaolei/Data/GS/Spark/SparkPostgresqlJdbc)。
查看Spark写入数据库中的数据
4.以下为项目中主要源码(完整项目源码Github):
4.1.项目配置源码pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<groupId>wangxiaolei
</groupId>
<artifactId>SparkPostgresqlJdbc
</artifactId>
<version>1.0-SNAPSHOT
</version>
<dependencies>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-core_2.11
</artifactId>
<version>2.1.0
</version>
</dependency>
<dependency>
<groupId>org.apache.spark
</groupId>
<artifactId>spark-sql_2.11
</artifactId>
<version>2.1.0
</version>
</dependency>
<dependency>
<groupId>org.postgresql
</groupId>
<artifactId>postgresql
</artifactId>
<version>9.4.1212
</version>
</dependency>
</dependencies>
</project>
4.2.java源码SparkPostgresqlJdbc.java
import org
.apache.spark.sql.Dataset
import org
.apache.spark.sql.Row
import org
.apache.spark.sql.SparkSession
import java
.util.Properties
public class SparkPostgresqlJdbc {
public static void main (String[] args) {
SparkSession spark = SparkSession
.builder()
.appName(
"SparkPostgresqlJdbc")
.config(
"spark.some.config.option",
"some-value")
.getOrCreate()
//启动runSparkPostgresqlJdbc程序
runSparkPostgresqlJdbc(spark)
spark
.stop()
}
private static void runSparkPostgresqlJdbc(SparkSession spark){
//new一个属性
System
.out.println(
"确保数据库已经开启,并创建了products表和插入了数据")
Properties connectionProperties = new Properties()
//增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)
System
.out.println(
"增加数据库的用户名(user)密码(password),指定postgresql驱动(driver)")
connectionProperties
.put(
"user",
"postgres")
connectionProperties
.put(
"password",
"123456")
connectionProperties
.put(
"driver",
"org.postgresql.Driver")
//SparkJdbc读取Postgresql的products表内容
System
.out.println(
"SparkJdbc读取Postgresql的products表内容")
Dataset<Row> jdbcDF = spark
.read()
.jdbc(
"jdbc:postgresql://localhost:5432/postgres",
"products",connectionProperties)
.select(
"name",
"price")
//显示jdbcDF数据内容
jdbcDF
.show()
//将jdbcDF数据新建并写入newproducts,append模式是连接模式,默认的是
"error"模式。
jdbcDF
.write()
.mode(
"append")
.jdbc(
"jdbc:postgresql://localhost:5432/postgres",
"newproducts",connectionProperties)
}
}
(完整项目源码Github)