1. JDBC 定义
Java Database Connectivity (JDBC) 定义了一套访问数据库的 API.
Java Database Connectivity (JDBC) is an application programming interface (API) for the Java programming language which defines how a client may access a database.1
好处是几乎可以使用完全相同的代码,访问不同的数据库:MySQL、Hive、Doris、Presto 等等。
定义在 package java.sql,主要包含了 DriverManager Driver Connection Statement ResultSet
.
2. JDBC 接口类的设计
JDBC 的接口类设计,使用起来非常方便,值得学习。看个读取 MySQL 表的例子:
public class MySQLJDBCSample {
public static void main(String[] args) throws SQLException {
String url = "jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai";
String user = "izualzhy";
String passwd = "izualzhy_test";
try (Connection connection = DriverManager.getConnection(url, user, passwd)) {
try (Statement statement = connection.createStatement()) {
String sql = "SHOW TABLES";
try (ResultSet resultSet = statement.executeQuery(sql)) {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.printf("%-32s\t", resultSet.getMetaData().getColumnName(i));
}
System.out.println("\n" + String.join("", Collections.nCopies(32, "-")));
while (resultSet.next()) {
for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
System.out.printf("%-32s\t", resultSet.getString(i));
}
System.out.println();
}
}
}
}
}
}
整体上接口分了几个步骤:
getConnection
获取连接createStatement
在连接上创建 statementexecuteQuery
发送 SQL 到服务端执行ResultSet
遍历获取结果
2.1. Driver
DriverManager.getConnection
遍历所有注册的 driver,找到适合连接串的:
DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai")
trying org.apache.hive.jdbc.HiveDriver
trying org.apache.derby.jdbc.AutoloadedDriver40
trying org.apache.calcite.jdbc.Driver
trying com.facebook.presto.jdbc.PrestoDriver
trying com.mysql.cj.jdbc.Driver
getConnection returning com.mysql.cj.jdbc.Driver
例如这里找到的就是com.mysql.cj.jdbc.Driver
.
com.mysql.cj.jdbc.Driver
的注册,主要依赖三步:
- SPI: 实现子类 Driver 时创建 META-INF/services/java.sql.Driver 文件,内容为:com.mysql.cj.jdbc.Driver
- ServiceLoader:
DriverManager.loadInitialDrivers
方法里,加载 1 里所有的 Driver 类:ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class)
- static scope: 触发子类 static 语句执行
java.sql.DriverManager.registerDriver(new Driver())
类似的使用方式很多,例如 flink-formats 里的 org.apache.flink.table.factories.Factory2、logback 里的 org.slf4j.spi.SLF4JServiceProvider3
2.2. Connection&Statement
Connection
是 JDBC 里定义的基类,代表跟数据库的连接。不同 Driver 返回的类型不同,例如com.mysql.cj.jdbc.ConnectionImpl
TrinoConnection
HiveConnection
ProxyConnection
等。
Statement
代表在连接上执行的具体指令。实际生产环境,更应该使用PreparedStatement
,有两个好处:
- PreparedStatement 可以预编译 SQL,降低数据库负载
- PreparedStatement 可以替换变量,我在接手模块的代码里,看到了大量的手动拼接 SQL 串。手动拼接的做法,更加的 error-prone,而且存在 SQL 注入风险。
ProxyConnection
是 HIKARI 实现的连接池对象,不过需要注意不是所有的连接都适合使用连接池,比如 Hive 连接。连接不关闭的话,会复用 YARN 的 appcliationId,导致无法区分不同 SQL 调起的任务。
2.3. executeUpdate/executeQuery/execute
例子里使用了executeQuery
,用于执行 DQL 语句,返回ResultSet
,存储了查询结果。
executeUpdate
用于执行 DML/DDL 语句,返回int
,表示影响的行数。
实际更推荐使用execute
,可以用于执行任意 SQL 语句,返回boolean
表示是否有ResultSet
。然后getResultSet
or getUpdateCount
分别处理。
2.4. Wrapper AutoCloseable
Connection
Statement
ResultSet
都继承了这两个类:
Wrapper
常出现在使用连接池的场景,例如通过poolConnection.unwrap(KyuubiConnection.class)
来获取包装的KyuubiConnection
,从而调用一些自定义方法。注意不是子类的关系,实际上是 HikariProxyConnection wrapping org.apache.kyuubi.jdbc.hive.KyuubiConnectionAutoCloseable
:用于资源回收,例如HiveConnection.close()
方法,会调用client.CloseSession(closeReq);transport.close();
,否则就会造成连接泄露。因此需要养成在 try-with-resources 语句里使用的习惯。
3. JDBC 访问 Hive
上一节介绍了各个接口类,是通用的封装。不同的数据库驱动,也有自己单独的方法。大数据使用 JDBC 的比较典型的场景是访问 Hive,因此专门介绍下。
3.1. beeline
比如获取日志, presto 是通过回调的方式:
public class PrestoStatement implements Statement {
public void setProgressMonitor(Consumer<QueryStats> progressMonitor) {
this.progressCallback.set(Optional.of(Objects.requireNonNull(progressMonitor, "progressMonitor is null")));
}
}
Hive 则需要单独线程里主动获取日志:
public class HiveStatement implements java.sql.Statement {
public boolean hasMoreLogs() {
return isLogBeingGenerated;
}
public List<String> getQueryLog() throws SQLException, ClosedOrCancelledStatementException {
return getQueryLog(true, fetchSize);
}
}
注意当检测到连接关闭,也应当再获取一次日志,避免日志不全的问题,具体可以参考 beeline4
Hive同样不支持在execute
里一次传入多条语句,SQL 语句的拆分也可以参考 beelin.
3.2. Synchorinzed VS ReentrantLock
HiveStatement 的实现里有这么一处:
public class HiveStatement implements java.sql.Statement {
private TCLIService.Iface client;
public boolean execute(String sql) throws SQLException {
// 调用 client.ExecuteStatement 提交 sql,如果 timeout 过小,这里可能提交超时
runAsyncOnServer(sql);
// while 循环,不断调用 client.GetOperationStatus 获取 HQL 执行状态
TGetOperationStatusResp status = waitForOperationToComplete();
...
}
public List<String> getQueryLog(boolean incremental, int fetchSize)
throws SQLException, ClosedOrCancelledStatementException {
...
TFetchResultsResp tFetchResultsResp = null;
try {
if (stmtHandle != null) {
tFetchResultsResp = client.FetchResults(tFetchResultsReq);
}
client
是随HiveConnection
初始化时构造的:
client = newSynchronizedClient(client);
newSynchronizedClient
实际上返回了Iface
代理类的实例:
public static TCLIService.Iface newSynchronizedClient(
TCLIService.Iface client) {
return (TCLIService.Iface) Proxy.newProxyInstance(
HiveConnection.class.getClassLoader(),
new Class [] { TCLIService.Iface.class },
new SynchronizedHandler(client));
}
而SynchronizedHandler.invoke
在 v2.3 之前的实现,client 的每个方法都会加锁:
private static class SynchronizedHandler implements InvocationHandler {
private final TCLIService.Iface client;
SynchronizedHandler(TCLIService.Iface client) {
this.client = client;
}
@Override
public Object invoke(Object proxy, Method method, Object [] args)
throws Throwable {
try {
synchronized (client) {
return method.invoke(client, args);
}
} ...
}
}
由于waitForOperationToComplete
里一直在 while 循环,因此就可能导致getQueryLog
无法获取到锁,导致日志一直获取不到。
Hive-161725通过引入ReentrantLock
公平锁解决了这个问题。
3.3. 其他
ResultSet
是单线程遍历,因此拉取数据的效率不高。对于较大的数据量,应当充分利用集群并行的能力,将数据写到目标存储或者分布式文件系统上。HiveConnection
初始化时,使用了DriverManager.getLoginTimeout
作为 socket Connect/Read/Write 的超时时间,但是这个值是全局的,需要注意issue6。