JDBC

 

1. JDBC 定义

Java Database Connectivity (JDBC) 定义了一套访问数据库的 API.

Java Database Connectivity (JDBC) is an application programming interface (API) for the Java programming language which defines how a client may access a database.1

好处是几乎可以使用完全相同的代码,访问不同的数据库:MySQL、Hive、Doris、Presto 等等。

定义在 package java.sql,主要包含了 DriverManager Driver Connection Statement ResultSet.

2. JDBC 接口类的设计

JDBC 的接口类设计,使用起来非常方便,值得学习。看个读取 MySQL 表的例子

public class MySQLJDBCSample {
    public static void main(String[] args) throws SQLException {
        String url = "jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai";
        String user = "izualzhy";
        String passwd = "izualzhy_test";
        try (Connection connection = DriverManager.getConnection(url, user, passwd)) {
            try (Statement statement = connection.createStatement()) {
                String sql = "SHOW TABLES";
                try (ResultSet resultSet = statement.executeQuery(sql)) {
                    for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                        System.out.printf("%-32s\t", resultSet.getMetaData().getColumnName(i));
                    }

                    System.out.println("\n" + String.join("", Collections.nCopies(32, "-")));

                    while (resultSet.next()) {
                        for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                            System.out.printf("%-32s\t", resultSet.getString(i));
                        }
                        System.out.println();
                    }
                }
            }
        }
    }
}

整体上接口分了几个步骤:

  1. getConnection获取连接
  2. createStatement在连接上创建 statement
  3. executeQuery发送 SQL 到服务端执行
  4. ResultSet遍历获取结果

2.1. Driver

DriverManager.getConnection遍历所有注册的 driver,找到适合连接串的:

DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai")
    trying org.apache.hive.jdbc.HiveDriver
    trying org.apache.derby.jdbc.AutoloadedDriver40
    trying org.apache.calcite.jdbc.Driver
    trying com.facebook.presto.jdbc.PrestoDriver
    trying com.mysql.cj.jdbc.Driver
getConnection returning com.mysql.cj.jdbc.Driver

例如这里找到的就是com.mysql.cj.jdbc.Driver.

com.mysql.cj.jdbc.Driver的注册,主要依赖三步:

  1. SPI: 实现子类 Driver 时创建 META-INF/services/java.sql.Driver 文件,内容为:com.mysql.cj.jdbc.Driver
  2. ServiceLoader: DriverManager.loadInitialDrivers方法里,加载 1 里所有的 Driver 类:ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class)
  3. static scope: 触发子类 static 语句执行 java.sql.DriverManager.registerDriver(new Driver())

类似的使用方式很多,例如 flink-formats 里的 org.apache.flink.table.factories.Factory2、logback 里的 org.slf4j.spi.SLF4JServiceProvider3

2.2. Connection&Statement

Connection是 JDBC 里定义的基类,代表跟数据库的连接。不同 Driver 返回的类型不同,例如com.mysql.cj.jdbc.ConnectionImpl TrinoConnection HiveConnection ProxyConnection等。

Statement 代表在连接上执行的具体指令。实际生产环境,更应该使用PreparedStatement,有两个好处:

  1. PreparedStatement 可以预编译 SQL,降低数据库负载
  2. PreparedStatement 可以替换变量,我在接手模块的代码里,看到了大量的手动拼接 SQL 串。手动拼接的做法,更加的 error-prone,而且存在 SQL 注入风险。

ProxyConnection是 HIKARI 实现的连接池对象,不过需要注意不是所有的连接都适合使用连接池,比如 Hive 连接。连接不关闭的话,会复用 YARN 的 appcliationId,导致无法区分不同 SQL 调起的任务。

2.3. executeUpdate/executeQuery/execute

例子里使用了executeQuery,用于执行 DQL 语句,返回ResultSet,存储了查询结果。

executeUpdate用于执行 DML/DDL 语句,返回int,表示影响的行数。

实际更推荐使用execute,可以用于执行任意 SQL 语句,返回boolean表示是否有ResultSet。然后getResultSet or getUpdateCount分别处理。

2.4. Wrapper AutoCloseable

Connection Statement ResultSet都继承了这两个类:

  1. Wrapper常出现在使用连接池的场景,例如通过poolConnection.unwrap(KyuubiConnection.class)来获取包装的KyuubiConnection,从而调用一些自定义方法。注意不是子类的关系,实际上是 HikariProxyConnection wrapping org.apache.kyuubi.jdbc.hive.KyuubiConnection
  2. AutoCloseable:用于资源回收,例如HiveConnection.close()方法,会调用client.CloseSession(closeReq);transport.close();,否则就会造成连接泄露。因此需要养成在 try-with-resources 语句里使用的习惯。

3. JDBC 访问 Hive

上一节介绍了各个接口类,是通用的封装。不同的数据库驱动,也有自己单独的方法。大数据使用 JDBC 的比较典型的场景是访问 Hive,因此专门介绍下。

3.1. beeline

比如获取日志, presto 是通过回调的方式:

public class PrestoStatement implements Statement {

    public void setProgressMonitor(Consumer<QueryStats> progressMonitor) {
        this.progressCallback.set(Optional.of(Objects.requireNonNull(progressMonitor, "progressMonitor is null")));
    }
}

Hive 则需要单独线程里主动获取日志:

public class HiveStatement implements java.sql.Statement {
  public boolean hasMoreLogs() {
    return isLogBeingGenerated;
  }

  public List<String> getQueryLog() throws SQLException, ClosedOrCancelledStatementException {
    return getQueryLog(true, fetchSize);
  }
}

注意当检测到连接关闭,也应当再获取一次日志,避免日志不全的问题,具体可以参考 beeline4

Hive同样不支持在execute里一次传入多条语句,SQL 语句的拆分也可以参考 beelin.

3.2. Synchorinzed VS ReentrantLock

HiveStatement 的实现里有这么一处:

public class HiveStatement implements java.sql.Statement {
  private TCLIService.Iface client;

  public boolean execute(String sql) throws SQLException {
      // 调用 client.ExecuteStatement 提交 sql,如果 timeout 过小,这里可能提交超时
      runAsyncOnServer(sql);
      // while 循环,不断调用 client.GetOperationStatus 获取 HQL 执行状态
      TGetOperationStatusResp status = waitForOperationToComplete();
      ...
  }

  public List<String> getQueryLog(boolean incremental, int fetchSize)
      throws SQLException, ClosedOrCancelledStatementException {
    ...
    TFetchResultsResp tFetchResultsResp = null;
    try {
      if (stmtHandle != null) {
        tFetchResultsResp = client.FetchResults(tFetchResultsReq);
}

client是随HiveConnection初始化时构造的:

client = newSynchronizedClient(client);

newSynchronizedClient实际上返回了Iface代理类的实例:

  public static TCLIService.Iface newSynchronizedClient(
      TCLIService.Iface client) {
    return (TCLIService.Iface) Proxy.newProxyInstance(
        HiveConnection.class.getClassLoader(),
      new Class [] { TCLIService.Iface.class },
      new SynchronizedHandler(client));
  }

SynchronizedHandler.invoke在 v2.3 之前的实现,client 的每个方法都会加锁:

  private static class SynchronizedHandler implements InvocationHandler {
    private final TCLIService.Iface client;

    SynchronizedHandler(TCLIService.Iface client) {
      this.client = client;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object [] args)
        throws Throwable {
      try {
        synchronized (client) {
          return method.invoke(client, args);
        }
      } ...
    }
  }

由于waitForOperationToComplete里一直在 while 循环,因此就可能导致getQueryLog无法获取到锁,导致日志一直获取不到。

Hive-161725通过引入ReentrantLock公平锁解决了这个问题。

3.3. 其他

  1. ResultSet是单线程遍历,因此拉取数据的效率不高。对于较大的数据量,应当充分利用集群并行的能力,将数据写到目标存储或者分布式文件系统上。
  2. HiveConnection初始化时,使用了DriverManager.getLoginTimeout作为 socket Connect/Read/Write 的超时时间,但是这个值是全局的,需要注意issue6

参考资料

  1. Java Database Connectivity
  2. org.apache.flink.table.factories.Factory
  3. org.slf4j.spi.SLF4JServiceProvider
  4. beeline.java
  5. HIVE-16172
  6. Socket timeouts happen when other drivers set DriverManager.loginTimeout