JDBC

 
声明:本站点文章内容均为古法手作,没有使用 AI 生成(代码除外,人工 review)。

1. JDBC 定义

Java Database Connectivity (JDBC) 定义了一套访问数据库的 API.

Java Database Connectivity (JDBC) is an application programming interface (API) for the Java programming language which defines how a client may access a database.1

好处是几乎可以使用完全相同的代码,访问不同的数据库:MySQL、Hive、Doris、Presto 等等。

定义在 package java.sql,主要包含了 DriverManager Driver Connection Statement ResultSet.

2. JDBC 接口类的设计

JDBC 的接口类设计,使用起来非常方便,值得学习。看个读取 MySQL 表的例子

public class MySQLJDBCSample {
    public static void main(String[] args) throws SQLException {
        String url = "jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai";
        String user = "izualzhy";
        String passwd = "izualzhy_test";
        try (Connection connection = DriverManager.getConnection(url, user, passwd)) {
            try (Statement statement = connection.createStatement()) {
                String sql = "SHOW TABLES";
                try (ResultSet resultSet = statement.executeQuery(sql)) {
                    for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                        System.out.printf("%-32s\t", resultSet.getMetaData().getColumnName(i));
                    }

                    System.out.println("\n" + String.join("", Collections.nCopies(32, "-")));

                    while (resultSet.next()) {
                        for (int i = 1; i <= resultSet.getMetaData().getColumnCount(); i++) {
                            System.out.printf("%-32s\t", resultSet.getString(i));
                        }
                        System.out.println();
                    }
                }
            }
        }
    }
}

整体上接口分了几个步骤:

  1. getConnection获取连接
  2. createStatement在连接上创建 statement
  3. executeQuery发送 SQL 到服务端执行
  4. ResultSet遍历获取结果

2.1. Driver

DriverManager.getConnection遍历所有注册的 driver,找到适合连接串的:

DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/quartz_jobs?serverTimezone=Asia/Shanghai")
    trying org.apache.hive.jdbc.HiveDriver
    trying org.apache.derby.jdbc.AutoloadedDriver40
    trying org.apache.calcite.jdbc.Driver
    trying com.facebook.presto.jdbc.PrestoDriver
    trying com.mysql.cj.jdbc.Driver
getConnection returning com.mysql.cj.jdbc.Driver

例如这里找到的就是com.mysql.cj.jdbc.Driver.

com.mysql.cj.jdbc.Driver的注册,主要依赖三步:

  1. SPI: 实现子类 Driver 时创建 META-INF/services/java.sql.Driver 文件,内容为:com.mysql.cj.jdbc.Driver
  2. ServiceLoader: DriverManager.loadInitialDrivers方法里,加载 1 里所有的 Driver 类:ServiceLoader<Driver> loadedDrivers = ServiceLoader.load(Driver.class)
  3. static scope: 触发子类 static 语句执行 java.sql.DriverManager.registerDriver(new Driver())

类似的使用方式很多,例如 flink-formats 里的 org.apache.flink.table.factories.Factory2、logback 里的 org.slf4j.spi.SLF4JServiceProvider3

2.2. Connection&Statement

Connection是 JDBC 里定义的基类,代表跟数据库的连接。不同 Driver 返回的类型不同,例如com.mysql.cj.jdbc.ConnectionImpl TrinoConnection HiveConnection ProxyConnection等。

Statement 代表在连接上执行的具体指令。实际生产环境,更应该使用PreparedStatement,有两个好处:

  1. PreparedStatement 可以预编译 SQL,降低数据库负载
  2. PreparedStatement 可以替换变量,我在接手模块的代码里,看到了大量的手动拼接 SQL 串。手动拼接的做法,更加的 error-prone,而且存在 SQL 注入风险。

ProxyConnection是 HIKARI 实现的连接池对象,不过需要注意不是所有的连接都适合使用连接池,比如 Hive 连接。连接不关闭的话,会复用 YARN 的 appcliationId,导致无法区分不同 SQL 调起的任务。

2.3. executeUpdate/executeQuery/execute

例子里使用了executeQuery,用于执行 DQL 语句,返回ResultSet,存储了查询结果。

executeUpdate用于执行 DML/DDL 语句,返回int,表示影响的行数。

实际更推荐使用execute,可以用于执行任意 SQL 语句,返回boolean表示是否有ResultSet。然后getResultSet or getUpdateCount分别处理。

2.4. Wrapper AutoCloseable

Connection Statement ResultSet都继承了这两个类:

  1. Wrapper常出现在使用连接池的场景,例如通过poolConnection.unwrap(KyuubiConnection.class)来获取包装的KyuubiConnection,从而调用一些自定义方法。注意不是子类的关系,实际上是 HikariProxyConnection wrapping org.apache.kyuubi.jdbc.hive.KyuubiConnection
  2. AutoCloseable:用于资源回收,例如HiveConnection.close()方法,会调用client.CloseSession(closeReq);transport.close();,否则就会造成连接泄露。因此需要养成在 try-with-resources 语句里使用的习惯。

3. JDBC 访问 Hive

上一节介绍了各个接口类,是通用的封装。不同的数据库驱动,也有自己单独的方法。大数据使用 JDBC 的比较典型的场景是访问 Hive,因此专门介绍下。

3.1. beeline

比如获取日志, presto 是通过回调的方式:

public class PrestoStatement implements Statement {

    public void setProgressMonitor(Consumer<QueryStats> progressMonitor) {
        this.progressCallback.set(Optional.of(Objects.requireNonNull(progressMonitor, "progressMonitor is null")));
    }
}

Hive 则需要单独线程里主动获取日志:

public class HiveStatement implements java.sql.Statement {
  public boolean hasMoreLogs() {
    return isLogBeingGenerated;
  }

  public List<String> getQueryLog() throws SQLException, ClosedOrCancelledStatementException {
    return getQueryLog(true, fetchSize);
  }
}

注意当检测到连接关闭,也应当再获取一次日志,避免日志不全的问题,具体可以参考 beeline4

Hive同样不支持在execute里一次传入多条语句,SQL 语句的拆分也可以参考 beelin.

3.2. Synchorinzed VS ReentrantLock

HiveStatement 的实现里有这么一处:

public class HiveStatement implements java.sql.Statement {
  private TCLIService.Iface client;

  public boolean execute(String sql) throws SQLException {
      // 调用 client.ExecuteStatement 提交 sql,如果 timeout 过小,这里可能提交超时
      runAsyncOnServer(sql);
      // while 循环,不断调用 client.GetOperationStatus 获取 HQL 执行状态
      TGetOperationStatusResp status = waitForOperationToComplete();
      ...
  }

  public List<String> getQueryLog(boolean incremental, int fetchSize)
      throws SQLException, ClosedOrCancelledStatementException {
    ...
    TFetchResultsResp tFetchResultsResp = null;
    try {
      if (stmtHandle != null) {
        tFetchResultsResp = client.FetchResults(tFetchResultsReq);
}

client是随HiveConnection初始化时构造的:

client = newSynchronizedClient(client);

newSynchronizedClient实际上返回了Iface代理类的实例:

  public static TCLIService.Iface newSynchronizedClient(
      TCLIService.Iface client) {
    return (TCLIService.Iface) Proxy.newProxyInstance(
        HiveConnection.class.getClassLoader(),
      new Class [] { TCLIService.Iface.class },
      new SynchronizedHandler(client));
  }

SynchronizedHandler.invoke在 v2.3 之前的实现,client 的每个方法都会加锁:

  private static class SynchronizedHandler implements InvocationHandler {
    private final TCLIService.Iface client;

    SynchronizedHandler(TCLIService.Iface client) {
      this.client = client;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object [] args)
        throws Throwable {
      try {
        synchronized (client) {
          return method.invoke(client, args);
        }
      } ...
    }
  }

由于waitForOperationToComplete里一直在 while 循环,因此就可能导致getQueryLog无法获取到锁,导致日志一直获取不到。

Hive-161725通过引入ReentrantLock公平锁解决了这个问题。

3.3. 其他

  1. ResultSet是单线程遍历,因此拉取数据的效率不高。对于较大的数据量,应当充分利用集群并行的能力,将数据写到目标存储或者分布式文件系统上。
  2. HiveConnection初始化时,使用了DriverManager.getLoginTimeout作为 socket Connect/Read/Write 的超时时间,但是这个值是全局的,需要注意issue6

参考资料

  1. Java Database Connectivity
  2. org.apache.flink.table.factories.Factory
  3. org.slf4j.spi.SLF4JServiceProvider
  4. beeline.java
  5. HIVE-16172
  6. Socket timeouts happen when other drivers set DriverManager.loginTimeout

This work is licensed under a Attribution-NonCommercial 4.0 International license.[转载需注明出处。] Attribution-NonCommercial 4.0 International