impala版本2.11.0-cdh5.14.0,kudu版本2.11.0-cdh5.14.0
最近使用impala+kudu时遇到个问题,有个程序会定时通过jdbc来创建kudu表,但是开发时,多个服务同时启动,就会并发创建同一个kudu表,有时会导致表创建异常,此时,在impala-shell中可以查到该表存在,但是实际kudu中不存在该表
[localhost:21000] > show tables; Query: show tables +------------------+ | name | +------------------+ | test | +------------------+ [localhost:21000] > select * from test; Query: select * from test Query submitted at: 2019-05-05 16:28:25 (Coordinator: http://localhost:25000) ERROR: AnalysisException: Failed to load metadata for table: 'test' CAUSED BY: TableLoadingException: Error loading metadata for Kudu table impala::default.test CAUSED BY: ImpalaRuntimeException: Error opening Kudu table 'impala::default.test', Kudu error: The table does not exist: table_name: "impala::default.test"在kudu管理界面http://kudu-master:8051/tables或者使用命令kudu table list <master_addresses>查看到kudu表test确实是不存在的。
通过编写测试代码重现现象,模拟3个线程同时使用jdbc创建一个kudu表
public class Test { static Logger log = Logger.getGlobal(); static class Task implements Runnable { private Connection con = null; private Integer index = null; public Task(Connection con, int index) { this.con = con; this.index = index; } @Override public void run() { try { con.createStatement().execute("create table if not exists test(id int ,age int ,name string,primary key(id)) stored as kudu"); log.log(Level.INFO, "线程-" + index + ":" + " 成功"); } catch (SQLException e) { log.log(Level.WARNING, "线程-" + index + " 失败:" + e.getMessage()); } } } public static void main(String[] args) throws ClassNotFoundException, SQLException { int threadCount = 3; ExecutorService executor = Executors.newFixedThreadPool(threadCount); List<Connection> cons = new ArrayList<>(); Class.forName("com.cloudera.impala.jdbc41.Driver"); Connection con = DriverManager.getConnection("jdbc:impala://localhost:21050"); con.createStatement().execute("drop table if exists test"); for (int i = 0; i < threadCount; i++) cons.add(DriverManager.getConnection("jdbc:impala://localhost:21050")); for (int j = 0; j < threadCount; j++) executor.submit(new Task(cons.get(j), j)); executor.shutdown(); } }从结果观察到,当3个线程中只有一个打印成功时,这个时候创建的测试表test是正常的,成功的日志:
警告: 线程-2 失败:[Simba][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::default.test' CAUSED BY: NonRecoverableException: Table impala::default.test already exists with id a0fd1b465d57471296edcbb6971acdf1 ), Query: create table if not exists test(id int ,age int ,name string,primary key(id)) stored as kudu. 五月 06, 2019 9:46:43 上午 utopa_report.Test$Task run 警告: 线程-1 失败:[Simba][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::default.test' CAUSED BY: NonRecoverableException: Table impala::default.test already exists with id a0fd1b465d57471296edcbb6971acdf1 ), Query: create table if not exists test(id int ,age int ,name string,primary key(id)) stored as kudu. 五月 06, 2019 9:46:43 上午 utopa_report.Test$Task run 信息: 线程-0: 成功当3个线程中有两个线程打印成功时,这个时候创建的测试表test是就出现上述的异常现象,多次测试,发现出现这种情况的概率还不小,失败的日志:
警告: 线程-1 失败:[Simba][ImpalaJDBCDriver](500051) ERROR processing query/statement. Error Code: 0, SQL state: TStatus(statusCode:ERROR_STATUS, sqlState:HY000, errorMessage:ImpalaRuntimeException: Error creating Kudu table 'impala::default.test' CAUSED BY: NonRecoverableException: Table impala::default.test already exists with id 15a5c07d04b24c9f97ee8581e2404122 ), Query: create table if not exists test(id int ,age int ,name string,primary key(id)) stored as kudu. 五月 06, 2019 9:50:58 上午 utopa_report.Test$Task run 信息: 线程-2: 成功 五月 06, 2019 9:50:58 上午 utopa_report.Test$Task run 信息: 线程-0: 成功来看一下创建kudu表的流程,impala后端模块接收到DDL请求,然后通过thrift转发到前端,前端模块impala-frontend由java编写,主要负责语法解析,执行计划生成以及元数据的管理
impala后端会接收到DDL请求TCatalogOpRequest,CatalogOpExecutor::Exec方法进行执行在CatalogOpExecutor::Exec中通过CatalogServiceClientWrapper.ExecDdl发送RPC请求到CatalogServerCatalogServer通过代理对象Catalog(catalog.cc)调用了java本地方法JniCatalog.execDdlimpala前端入口JniCatalog.execDdl接收到请求thriftDdlExecReq,先将请求反序列化为thrift对象TDdlExecRequest,然后传递给CatalogOpExecutor.execDdlRequest,最后把响应结果序列化为字节返回给后端CatalogOpExecutor.execDdlRequest根据请求的ddl_type判断到是建表请求,进入createTable,这个方法会返回布尔值表示是否建表成功,但execDdlRequest并没对布尔值进行处理,导致上面的测试有些情况打印的是成功,但是createTable返回的是false,实质是建表过程产生异常失败了 public TDdlExecResponse execDdlRequest(TDdlExecRequest ddlRequest) throws ImpalaException { switch (ddlRequest.ddl_type) { case ALTER_TABLE: alterTable(ddlRequest.getAlter_table_params(), response); break; case CREATE_TABLE: createTable(ddlRequest.getCreate_table_params(), response); break; default: throw new IllegalStateException("Unexpected DDL exec request type: " + ddlRequest.ddl_type); } response.getResult().setStatus(new TStatus(TErrorCode.OK, new ArrayList<String>())); return response; } 在CatalogOpExecutor.createTable中,如果指定if not exists关键词,并且catalog中已经存在该表,直接跳过,所以如果不是并发的情况,应该所有请求到这里都全部返回了,判断到是kudu表,调用createKuduTable private boolean createTable(TCreateTableParams params, TDdlExecResponse response) throws ImpalaException { LOG.info("-A-------------------beging create table--------------------"); if (params.if_not_exists && catalog_.containsTable(tableName.getDb(), tableName.getTbl())) { //建表语句指定了if not exists且catalog中存在该表,则立即返回 LOG.trace(String.format("Skipping table creation because %s already exists and " + "IF NOT EXISTS was specified.", tableName)); response.getResult().setVersion(catalog_.getCatalogVersion()); LOG.info("-B-------------------table exist--------------------"); return false; } org.apache.hadoop.hive.metastore.api.Table tbl = createMetaStoreTable(params); if (KuduTable.isKuduTable(tbl)) return createKuduTable(tbl, params, response); return createTable(tbl, params.if_not_exists, params.getCache_op(), response); } 后面的具体逻辑就进入到上图的右侧子流程,下面的代码为了打印更多调试日志所以增加了部分日志代码和异常代码 a.使用KuduClient创建kudu表,如果表存在且指定if not exists则直接return,如果表存在且没指定if not exists则抛ImpalaRuntimeException,如果表不存在则创建表,并发建表时会抛org.apache.kudu.client.NonRecoverableExceptionb.把表添加到hive metastore,如果表已经存在,抛AlreadyExistsException且执行d步骤c.把表添加 到catalg cache,b+c过程加锁保证原子性d.如果bc步骤发生异常,则把a步骤创建的kudu表删除,删除前判断表是否存在,如果不存在则抛 TableNotFoundException private boolean createKuduTable(org.apache.hadoop.hive.metastore.api.Table newTable, TCreateTableParams params, TDdlExecResponse response) throws ImpalaException { Preconditions.checkState(KuduTable.isKuduTable(newTable)); try { //增加异常处理,方便打印日志调试 if (Table.isExternalTable(newTable)) { KuduCatalogOpExecutor.populateColumnsFromKudu(newTable); } else { KuduCatalogOpExecutor.createManagedTable(newTable, params); //通过KuduClient创建表,先会判断表是否判断,但不是同步执行,所以并发执行到该处可能会发生org.apache.kudu.client.NonRecoverableException异常 } } catch (Exception e) { LOG.info("-C-------------------create kudu table error--------------------",e); throw e; } try { synchronized (metastoreDdlLock_) { try (MetaStoreClient msClient = catalog_.getMetaStoreClient()) { msClient.getHiveClient().createTable(newTable); }catch (Exception e) { //增加异常处理,方便打印日志调试 LOG.info("-D-------------------add table to hivemetastore error--------------------", e); throw e; } // Add the table to the catalog cache try { //增加异常处理,方便打印日志调试 Table newTbl = catalog_.addTable(newTable.getDbName(), newTable.getTableName()); addTableToCatalogUpdate(newTbl, response.result); } catch (Exception e) { LOG.info("-E-------------------add table to catalog cache error--------------------",e); throw e; } } } catch (Exception e) { LOG.info("-F-------------------add to HMS and catalog cache error--------------------",e); try { if (!Table.isExternalTable(newTable)) { KuduCatalogOpExecutor.dropTable(newTable, false); LOG.info("-G-------------------drop kudu table--------------------"); } } catch (Exception logged) { //可能是kudu表不存在TableNotFoundException String kuduTableName = newTable.getParameters().get(KuduTable.KEY_TABLE_NAME); LOG.error(String.format("Failed to drop Kudu table '%s'", kuduTableName), logged); LOG.info("-H-------------------drop kudu table error--------------------"); throw new RuntimeException( String.format("Failed to create the table '%s' in " + " the Metastore and the newly created Kudu table '%s' could not be " + " dropped. The log contains more information.", newTable.getTableName(), kuduTableName), e); } if (e instanceof AlreadyExistsException && params.if_not_exists){ //这种情况是因为向hms创建表时,表已经存在 LOG.info("-I-------------------add to HMS and catalog cache error because already exists--------------------",e); return false; } throw new ImpalaRuntimeException(String.format(HMS_RPC_ERROR_FORMAT_STR, "createTable"), e); } LOG.info("-J-------------------add table success--------------------"); return true; }主要通过往CatalogOpExecutor.java增加了一些异常处理和日志打印,把对应的类更新到impala-frontend-0.1-SNAPSHOT.jar,并覆盖impala-catalog服务所在节点目录/usr/lib/impala/lib 下的jar,重启集群服务。 如果建表过程中发生了任何异常,CatalogServer将会把状态码当成错误来处理,通过分析日志可以了解到上面两种现象的产生
1个线程打印成功,2个线程打印失败:3个线程都可以成功通过步骤6进入到步骤7,且同时到达7a,这时只能有一个线程通过KuduClient成功创建kudu表,其它两个线程则发生NonRecoverableException中断返回,成功的线程可以继续往下步骤2个线程打印成功,1个线程打印失败:失败的线程是因为在7a时创建kudu表而发生的NonRecoverableException异常,第1个成功线程是成功创建了表,第2个成功线程在7a步骤时kudu表已经存在,且有指定if not exists所以直接return没做任何操作,但在7b时HMS中已经发现第1个成功的线程已经创建了表,所以抛了AlreadyExistsException,进入7d步骤把第1个成功线程创建的kudu表删除,此时HMS和catalog依然存在该表,但kudu中已经没了该表,由于判断到是AlreadyExistsException异常,没继续向上抛而是return false,这个返回值没有被处理到,所以TDdlExecResponse的状态码依然是成功,所以该线程也打印了成功。2个线程打印成功,1个线程打印失败:3个线程都无异常通过7a,其中一个是成功创建了kudu表,另两个是判断到表存在直接return,到达7b时,第一个成功线程成功走完所有余下步骤,第2个成功线程发现HMS已经存在该表,情况与上一点的第2个成功线程一样,失败的线程也是在创建HMS表时抛AlreadyExistsException然后进入7d步骤,但是由于kudu表已经被第2个成功线程删除掉,所以失败的线程判断到表已经不存在后抛了TableNotFoundException经过多次测试分析catalog的日志总结得到以上3种情况,附上日志