跳到主要内容

从堆栈学习源码

虽然还是有些囫囵吞枣, 但是从堆栈角度来学习真的是事半功倍, 直接看源码压根串不起来, 选择路径实在太多了.

  • 从beeline的连接错误, 学习beeline的实例化访问过程;
  • 从beeline sql的语法错误, 学习hive server的sql解析和执行过程

beeline 实例化执行过程

启动代码是hadoop那边, hadoop-common runjar启动jar包里的应用, 然后走到hive这边, hive这里各种beeline参数解析, 后面具体执行还是反复解析, 然后调用hive driver去执行, driver调用thrift去执行. 在thrift部分就开始混乱了, 涉及到认证加密的交互, 二进制的交互, 一直都是混乱的地方.

整体而言, beeline里就是各种进程调用,一个进程委托给另一个进程, 最后核心就是driver, 然后通过thrift rpc交给hive server, 由hive server执行具体的sql.


bash-4.2# /usr/local/cluster-shim/v3/bin/beeline -u "jdbc:hive2://10.0.0.121:7001/default;principal=hadoop/_HOST@TBDS-BJUY7BPB;" --verbose=true
HADOOP_CONF_DIR=/etc/hadoop/conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/cluster-shim/v3/lib/hive/lib/log4j-slf4j-impl-2.18.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/cluster-shim/v3/lib/hadoop/share/hadoop/common/lib/log4j-slf4j-impl-2.20.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]
!connect jdbc:hive2://10.0.0.121:7001/default;principal=hadoop/_HOST@TBDS-BJUY7BPB; '' [passwd stripped]
Connecting to jdbc:hive2://10.0.0.121:7001/default;principal=hadoop/_HOST@TBDS-BJUY7BPB;
23/10/25 15:54:47 [main]: ERROR transport.TSaslTransport: SASL negotiation failure
javax.security.sasl.SaslException: GSS initiate failed
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) ~[?:1.8.0_382]
at org.apache.thrift.transport.TSaslClientTransport.handleSaslStartMessage(TSaslClientTransport.java:94) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:271) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:51) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:48) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at java.security.AccessController.doPrivileged(Native Method) ~[?:1.8.0_382]
at javax.security.auth.Subject.doAs(Subject.java:422) ~[?:1.8.0_382]
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730) ~[hadoop-common-3.2.1-TBDS-5.2.0.1.jar:?]
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport.open(TUGIAssumingTransport.java:48) ~[hive-exec-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:343) ~[hive-jdbc-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:228) ~[hive-jdbc-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107) ~[hive-jdbc-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at java.sql.DriverManager.getConnection(DriverManager.java:664) ~[?:1.8.0_382]
at java.sql.DriverManager.getConnection(DriverManager.java:208) ~[?:1.8.0_382]
at org.apache.hive.beeline.DatabaseConnection.connect(DatabaseConnection.java:145) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.DatabaseConnection.getConnection(DatabaseConnection.java:209) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.Commands.connect(Commands.java:1641) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.Commands.connect(Commands.java:1536) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.hive.beeline.ReflectiveCommandHandler.execute(ReflectiveCommandHandler.java:56) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.execCommandWithPrefix(BeeLine.java:1389) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1428) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.connectUsingArgs(BeeLine.java:905) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.initArgs(BeeLine.java:795) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:1053) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:538) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:520) ~[hive-beeline-3.1.2-TBDS-5.2.0.1.jar:3.1.2-TBDS-5.2.0.1]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_382]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_382]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_382]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_382]
at org.apache.hadoop.util.RunJar.run(RunJar.java:323) ~[hadoop-common-3.2.1-TBDS-5.2.0.1.jar:?]
at org.apache.hadoop.util.RunJar.main(RunJar.java:236) ~[hadoop-common-3.2.1-TBDS-5.2.0.1.jar:?]
Caused by: org.ietf.jgss.GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:162) ~[?:1.8.0_382]
at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:122) ~[?:1.8.0_382]
at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:189) ~[?:1.8.0_382]
at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:224) ~[?:1.8.0_382]
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212) ~[?:1.8.0_382]
at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179) ~[?:1.8.0_382]
at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:192) ~[?:1.8.0_382]
... 36 more
23/10/25 15:54:47 [main]: WARN jdbc.HiveConnection: Failed to connect to 10.0.0.121:7001
Unknown HS2 problem when communicating with Thrift server.
Error: Could not open client transport with JDBC Uri: jdbc:hive2://10.0.0.121:7001/default;principal=hadoop/_HOST@TBDS-BJUY7BPB;: GSS initiate failed (state=08S01,code=0)
java.sql.SQLException: Could not open client transport with JDBC Uri: jdbc:hive2://10.0.0.121:7001/default;principal=hadoop/_HOST@TBDS-BJUY7BPB;: GSS initiate failed
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:256)
at org.apache.hive.jdbc.HiveDriver.connect(HiveDriver.java:107)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.apache.hive.beeline.DatabaseConnection.connect(DatabaseConnection.java:145)
at org.apache.hive.beeline.DatabaseConnection.getConnection(DatabaseConnection.java:209)
at org.apache.hive.beeline.Commands.connect(Commands.java:1641)
at org.apache.hive.beeline.Commands.connect(Commands.java:1536)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hive.beeline.ReflectiveCommandHandler.execute(ReflectiveCommandHandler.java:56)
at org.apache.hive.beeline.BeeLine.execCommandWithPrefix(BeeLine.java:1389)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1428)
at org.apache.hive.beeline.BeeLine.connectUsingArgs(BeeLine.java:905)
at org.apache.hive.beeline.BeeLine.initArgs(BeeLine.java:795)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:1053)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:538)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:520)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Caused by: org.apache.thrift.transport.TTransportException: GSS initiate failed
at org.apache.thrift.transport.TSaslTransport.sendAndThrowMessage(TSaslTransport.java:232)
at org.apache.thrift.transport.TSaslTransport.open(TSaslTransport.java:316)
at org.apache.thrift.transport.TSaslClientTransport.open(TSaslClientTransport.java:37)
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:51)
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport$1.run(TUGIAssumingTransport.java:48)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
at org.apache.hadoop.hive.metastore.security.TUGIAssumingTransport.open(TUGIAssumingTransport.java:48)
at org.apache.hive.jdbc.HiveConnection.openTransport(HiveConnection.java:343)
at org.apache.hive.jdbc.HiveConnection.<init>(HiveConnection.java:228)
... 25 more

上面这个bug, 最后发现是hadoop_conf_dir环境变量有问题, 需要进行正确设置:

export HADOOP_CONF_DIR=/usr/local/cluster-shim/v3/conf/tbds-bjuy7bpb/

hive sql语法解析流程


0: jdbc:hive2://10.0.0.121:7001/default> show abc;
going to print operations logs
printed operations logs
Getting log thread is interrupted, since query is done!
Error: Error while compiling statement: FAILED: ParseException line 1:5 cannot recognize input near 'show' 'abc' '<EOF>' in ddl statement (state=42000,code=40000)
org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED: ParseException line 1:5 cannot recognize input near 'show' 'abc' '<EOF>' in ddl statement
at org.apache.hive.jdbc.Utils.verifySuccess(Utils.java:300)
at org.apache.hive.jdbc.Utils.verifySuccessWithInfo(Utils.java:286)
at org.apache.hive.jdbc.HiveStatement.runAsyncOnServer(HiveStatement.java:324)
at org.apache.hive.jdbc.HiveStatement.execute(HiveStatement.java:265)
at org.apache.hive.beeline.Commands.executeInternal(Commands.java:1005)
at org.apache.hive.beeline.Commands.execute(Commands.java:1201)
at org.apache.hive.beeline.Commands.sql(Commands.java:1130)
at org.apache.hive.beeline.BeeLine.dispatch(BeeLine.java:1429)
at org.apache.hive.beeline.BeeLine.execute(BeeLine.java:1291)
at org.apache.hive.beeline.BeeLine.begin(BeeLine.java:1074)
at org.apache.hive.beeline.BeeLine.mainWithInputRedirection(BeeLine.java:538)
at org.apache.hive.beeline.BeeLine.main(BeeLine.java:520)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.util.RunJar.run(RunJar.java:323)
at org.apache.hadoop.util.RunJar.main(RunJar.java:236)
Caused by: org.apache.hive.service.cli.HiveSQLException: Error while compiling statement: FAILED: ParseException line 1:5 cannot recognize input near 'show' 'abc' '<EOF>' in ddl statement
at org.apache.hive.service.cli.operation.Operation.toSQLException(Operation.java:330)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:230)
at org.apache.hive.service.cli.operation.SQLOperation.runInternal(SQLOperation.java:291)
at org.apache.hive.service.cli.operation.Operation.run(Operation.java:246)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementInternal(HiveSessionImpl.java:541)
at org.apache.hive.service.cli.session.HiveSessionImpl.executeStatementAsync(HiveSessionImpl.java:527)
at sun.reflect.GeneratedMethodAccessor242.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:78)
at org.apache.hive.service.cli.session.HiveSessionProxy.access$000(HiveSessionProxy.java:36)
at org.apache.hive.service.cli.session.HiveSessionProxy$1.run(HiveSessionProxy.java:63)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:2065)
at org.apache.hive.service.cli.session.HiveSessionProxy.invoke(HiveSessionProxy.java:59)
at com.sun.proxy.$Proxy51.executeStatementAsync(Unknown Source)
at org.apache.hive.service.cli.CLIService.executeStatementAsync(CLIService.java:312)
at org.apache.hive.service.cli.thrift.ThriftCLIService.ExecuteStatement(ThriftCLIService.java:562)
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1557)
at org.apache.hive.service.rpc.thrift.TCLIService$Processor$ExecuteStatement.getResult(TCLIService.java:1542)
at org.apache.thrift.ProcessFunction.process(ProcessFunction.java:39)
at org.apache.thrift.TBaseProcessor.process(TBaseProcessor.java:39)
at org.apache.hadoop.hive.metastore.security.HadoopThriftAuthBridge$Server$TUGIAssumingProcessor.process(HadoopThriftAuthBridge.java:647)
at org.apache.thrift.server.TThreadPoolServer$WorkerProcess.run(TThreadPoolServer.java:286)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:750)
Caused by: java.lang.RuntimeException: org.apache.hadoop.hive.ql.parse.ParseException:line 1:5 cannot recognize input near 'show' 'abc' '<EOF>' in ddl statement
at org.apache.hadoop.hive.ql.parse.ParseDriver.parse(ParseDriver.java:223)
at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:74)
at org.apache.hadoop.hive.ql.parse.ParseUtils.parse(ParseUtils.java:67)
at org.apache.hadoop.hive.ql.Driver.compile(Driver.java:622)
at org.apache.hadoop.hive.ql.Driver.compileInternal(Driver.java:1856)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1803)
at org.apache.hadoop.hive.ql.Driver.compileAndRespond(Driver.java:1798)
at org.apache.hadoop.hive.ql.reexec.ReExecDriver.compileAndRespond(ReExecDriver.java:126)
at org.apache.hive.service.cli.operation.SQLOperation.prepare(SQLOperation.java:228)
... 26 more
0: jdbc:hive2://10.0.0.121:7001/default>



异步查询设置超时时间

看起来常见的思路是这样, 执行异步任务的时候, submit会返回future变量, 后面可以通过操作这个变量对异步线程进行中断. 额外起一个延时执行的线程, executor支持设置delaytime, 时间到了自动调用异步线程中断, 并报错timeout, 两个线程的配合实现了sql执行时间的超时限制.

prepared里设置了timeout executor

  /**
* Compile the query and extract metadata
*
* @throws HiveSQLException
*/
private void prepare(QueryState queryState) throws HiveSQLException {
setState(OperationState.RUNNING);
try {
driver = DriverFactory.newDriver(queryState, queryInfo);

// Start the timer thread for canceling the query when query timeout is reached
// queryTimeout == 0 means no timeout
if (queryTimeout > 0L) {
timeoutExecutor = Executors.newSingleThreadScheduledExecutor();
timeoutExecutor.schedule(() -> {
try {
final String queryId = queryState.getQueryId();
log.info("Query timed out after: {} seconds. Cancelling the execution now: {}", queryTimeout, queryId);
SQLOperation.this.cancel(OperationState.TIMEDOUT);
} catch (HiveSQLException e) {
log.error("Error cancelling the query after timeout: {} seconds", queryTimeout, e);
}
return null;
}, queryTimeout, TimeUnit.SECONDS);
}

queryInfo.setQueryDisplay(driver.getQueryDisplay());
if (operationLog != null) {
queryInfo.setOperationLogLocation(operationLog.toString());
}

// set the operation handle information in Driver, so that thrift API users
// can use the operation handle they receive, to lookup query information in
// Yarn ATS, also used in logging so remove padding for better display
String guid64 = Base64.getUrlEncoder().withoutPadding()
.encodeToString(getHandle().getHandleIdentifier().toTHandleIdentifier().getGuid());
driver.setOperationId(guid64);

// In Hive server mode, we are not able to retry in the FetchTask
// case, when calling fetch queries since execute() has returned.
// For now, we disable the test attempts.
driver.compileAndRespond(statement);
if (queryState.getQueryTag() != null && queryState.getQueryId() != null) {
parentSession.updateQueryTag(queryState.getQueryId(), queryState.getQueryTag());
}
setHasResultSet(driver.hasResultSet());

提交异步任务里, 设置了用于控制的future变量

  @Override
public void runInternal() throws HiveSQLException {
setState(OperationState.PENDING);

final boolean doRunAsync = shouldRunAsync();
final boolean asyncPrepare = doRunAsync
&& HiveConf.getBoolVar(queryState.getConf(),
HiveConf.ConfVars.HIVE_SERVER2_ASYNC_EXEC_ASYNC_COMPILE);
if (!asyncPrepare) {
prepare(queryState);
}
if (!doRunAsync) {
runQuery();
} else {
// We'll pass ThreadLocals in the background thread from the foreground (handler) thread.
// 1) ThreadLocal Hive object needs to be set in background thread
// 2) The metastore client in Hive is associated with right user.
// 3) Current UGI will get used by metastore when metastore is in embedded mode
Runnable work =
new BackgroundWork(getCurrentUGI(), parentSession.getSessionHive(), SessionState.get(),
asyncPrepare);

try {
// This submit blocks if no background threads are available to run this operation
Future<?> backgroundHandle = getParentSession().submitBackgroundOperation(work);
setBackgroundHandle(backgroundHandle);
} catch (RejectedExecutionException rejected) {
setState(OperationState.ERROR);
throw new HiveSQLException("The background threadpool cannot accept" +
" new task for execution, please retry the operation", rejected);
}
}
}
  @Override
public Future<?> submitBackgroundOperation(Runnable work) {
return getSessionManager().submitBackgroundOperation(
operationLock == null ? work : new FutureTask<Void>(work, null) {
protected void done() {
// We assume this always comes from a user operation that took the lock.
operationLock.release();
};
});
}

  public Future<?> submitBackgroundOperation(Runnable r) {
return backgroundOperationPool.submit(r);
}


/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}

sql的核心流程

通过driver对sql进行解析并执行

  private void runQuery() throws HiveSQLException {
try {
OperationState opState = getState();
// Operation may have been cancelled by another thread
if (opState.isTerminal()) {
log.info("Not running the query. Operation is already in terminal state: " + opState
+ ", perhaps cancelled due to query timeout or by another thread.");
return;
}
// In Hive server mode, we are not able to retry in the FetchTask
// case, when calling fetch queries since execute() has returned.
// For now, we disable the test attempts.
driver.run();
} catch (Throwable e) {
/**
* If the operation was cancelled by another thread, or the execution timed out, Driver#run
* may return a non-zero response code. We will simply return if the operation state is
* CANCELED, TIMEDOUT, CLOSED or FINISHED, otherwise throw an exception
*/
if (getState().isTerminal()) {
log.warn("Ignore exception in terminal state: {}", getState(), e);
return;
}
setState(OperationState.ERROR);
if (e instanceof CommandProcessorException) {
throw toSQLException("Error while compiling statement", (CommandProcessorException)e);
} else if (e instanceof HiveSQLException) {
throw (HiveSQLException) e;
} else if (e instanceof OutOfMemoryError) {
throw (OutOfMemoryError) e;
} else {
throw new HiveSQLException("Error running query", e);
}
}
setState(OperationState.FINISHED);
}

executor里主要的是

cpr = coreDriver.run();
coreDriver.compileAndRespond(currentQuery);
public class ReExecDriver implements IDriver {

@Override
public CommandProcessorResponse run() throws CommandProcessorException {
executionIndex = 0;
int maxExecutions = 1 + coreDriver.getConf().getIntVar(ConfVars.HIVE_QUERY_MAX_REEXECUTION_COUNT);

while (true) {
executionIndex++;

for (IReExecutionPlugin p : plugins) {
p.beforeExecute(executionIndex, explainReOptimization);
}
coreDriver.getContext().setExecutionIndex(executionIndex);
LOG.info("Execution #{} of query", executionIndex);
CommandProcessorResponse cpr = null;
CommandProcessorException cpe = null;
try {
cpr = coreDriver.run();
} catch (CommandProcessorException e) {
cpe = e;
}

PlanMapper oldPlanMapper = coreDriver.getPlanMapper();
boolean success = cpr != null;
plugins.forEach(p -> p.afterExecute(oldPlanMapper, success));

boolean shouldReExecute = explainReOptimization && executionIndex==1;
shouldReExecute |= cpr == null && plugins.stream().anyMatch(p -> p.shouldReExecute(executionIndex));

if (executionIndex >= maxExecutions || !shouldReExecute) {
if (cpr != null) {
return cpr;
} else {
throw cpe;
}
}
LOG.info("Preparing to re-execute query");
plugins.forEach(IReExecutionPlugin::prepareToReExecute);

try {
coreDriver.compileAndRespond(currentQuery);
} catch (CommandProcessorException e) {
LOG.error("Recompilation of the query failed; this is unexpected.");
// FIXME: somehow place pointers that re-execution compilation have failed; the query have been successfully compiled before?
throw e;
}

PlanMapper newPlanMapper = coreDriver.getPlanMapper();
if (!explainReOptimization &&
!plugins.stream().anyMatch(p -> p.shouldReExecuteAfterCompile(executionIndex, oldPlanMapper, newPlanMapper))) {
LOG.info("re-running the query would probably not yield better results; returning with last error");
// FIXME: retain old error; or create a new one?
return cpr;
}
}
}
...
}

driver里先执行compile, sql解析

  /**
* Compiles an HQL command, creates an execution plan for it.
*
* @param command The HiveQL query to compile
* @param resetTaskIds Resets taskID counter if true.
* @param deferClose indicates if the close/destroy should be deferred when the process has been interrupted, it
* should be set to true if the compile is called within another method like runInternal, which defers the
* close to the called in that method.
*/
@VisibleForTesting
public void compile(String command, boolean resetTaskIds, boolean deferClose) throws CommandProcessorException {
prepareForCompile(resetTaskIds);

Compiler compiler = new Compiler(context, driverContext, driverState);
QueryPlan plan = compiler.compile(command, deferClose);
driverContext.setPlan(plan);

compileFinished(deferClose);
}


/**
* @param deferClose indicates if the close/destroy should be deferred when the process has been interrupted
* it should be set to true if the compile method is called within another method like runInternal,
* which defers the close to the called in that method.
*/
public QueryPlan compile(String rawCommand, boolean deferClose) throws CommandProcessorException {
initialize(rawCommand);

Throwable compileException = null;
boolean parsed = false;
QueryPlan plan = null;
try {
DriverUtils.checkInterrupted(driverState, driverContext, "before parsing and analysing the query", null, null);

parse();
parsed = true;
BaseSemanticAnalyzer sem = analyze();

DriverUtils.checkInterrupted(driverState, driverContext, "after analyzing query.", null, null);

plan = createPlan(sem);
initializeFetchTask(plan);
authorize(sem);
explainOutput(sem, plan);
} catch (CommandProcessorException cpe) {
compileException = cpe.getCause();
throw cpe;
} catch (Exception e) {
compileException = e;
DriverUtils.checkInterrupted(driverState, driverContext, "during query compilation: " + e.getMessage(), null,
null);
handleException(e);
} finally {
cleanUp(compileException, parsed, deferClose);
}

return plan;
}

driver里具体的执行是execute

  private void runInternal(String command, boolean alreadyCompiled) throws CommandProcessorException {
DriverState.setDriverState(driverState);

QueryPlan plan = driverContext.getPlan();
if (plan != null && plan.isPrepareQuery() && !plan.isExplain()) {
LOG.info("Skip running tasks for prepare plan");
return;
}

setInitialStateForRun(alreadyCompiled);

// a flag that helps to set the correct driver state in finally block by tracking if
// the method has been returned by an error or not.
boolean isFinishedWithError = true;
try {
HiveDriverRunHookContext hookContext = new HiveDriverRunHookContextImpl(driverContext.getConf(),
alreadyCompiled ? context.getCmd() : command);
runPreDriverHooks(hookContext);

if (!alreadyCompiled) {
compileInternal(command, true);
} else {
driverContext.getPlan().setQueryStartTime(driverContext.getQueryDisplay().getQueryStartTime());
}

// Reset the PerfLogger so that it doesn't retain any previous values.
// Any value from compilation phase can be obtained through the map set in queryDisplay during compilation.
PerfLogger perfLogger = SessionState.getPerfLogger(true);

// the reason that we set the txn manager for the cxt here is because each query has its own ctx object.
// The txn mgr is shared across the same instance of Driver, which can run multiple queries.
context.setHiveTxnManager(driverContext.getTxnManager());

DriverUtils.checkInterrupted(driverState, driverContext, "at acquiring the lock.", null, null);

lockAndRespond();

if (validateTxnList()) {
// the reason that we set the txn manager for the cxt here is because each query has its own ctx object.
// The txn mgr is shared across the same instance of Driver, which can run multiple queries.
context.setHiveTxnManager(driverContext.getTxnManager());
perfLogger = SessionState.getPerfLogger(true);
}
execute();

FetchTask fetchTask = driverContext.getPlan().getFetchTask();
if (fetchTask != null) {
fetchTask.setTaskQueue(null);
fetchTask.setQueryPlan(null);
try {
fetchTask.execute();
driverContext.setFetchTask(fetchTask);
} catch (Throwable e) {
throw new CommandProcessorException(e);
}
}
driverTxnHandler.handleTransactionAfterExecution();

driverContext.getQueryDisplay().setPerfLogStarts(QueryDisplay.Phase.EXECUTION, perfLogger.getStartTimes());
driverContext.getQueryDisplay().setPerfLogEnds(QueryDisplay.Phase.EXECUTION, perfLogger.getEndTimes());

runPostDriverHooks(hookContext);
isFinishedWithError = false;
} finally {
if (driverState.isAborted()) {
closeInProcess(true);
} else {
releaseResources();
}
driverState.executionFinishedWithLocking(isFinishedWithError);
}

SessionState.getPerfLogger().cleanupPerfLogMetrics();
}

execute执行的时候,会调用prehook, posthook, 用于发送血缘等信息


public void execute() throws CommandProcessorException {
SessionState.getPerfLogger().perfLogBegin(CLASS_NAME, PerfLogger.DRIVER_EXECUTE);

boolean noName = Strings.isNullOrEmpty(driverContext.getConf().get(MRJobConfig.JOB_NAME));

checkState();

// Whether there's any error occurred during query execution. Used for query lifetime hook.
boolean executionError = false;

try {
LOG.info("Executing command(queryId=" + driverContext.getQueryId() + "): " + driverContext.getQueryString());

// TODO: should this use getUserFromAuthenticator?
hookContext = new PrivateHookContext(driverContext, context);

preExecutionActions();
preExecutionCacheActions();
// Disable HMS cache so any metadata calls during execution get fresh responses.
driverContext.getQueryState().disableHMSCache();
runTasks(noName);
driverContext.getQueryState().enableHMSCache();
postExecutionCacheActions();
postExecutionActions();
} catch (CommandProcessorException cpe) {
executionError = true;
throw cpe;
} catch (Throwable e) {
executionError = true;
DriverUtils.checkInterrupted(driverState, driverContext, "during query execution: \n" + e.getMessage(),
hookContext, SessionState.getPerfLogger());
handleException(hookContext, e);
} finally {
cleanUp(noName, hookContext, executionError);
driverContext.getQueryState().enableHMSCache();
}
}


private void runTasks(boolean noName) throws Exception {
SessionState.getPerfLogger().perfLogBegin(CLASS_NAME, PerfLogger.RUN_TASKS);

int jobCount = getJobCount();
String jobName = getJobName();

// Loop while you either have tasks running, or tasks queued up
while (taskQueue.isRunning()) {
launchTasks(noName, jobCount, jobName);
handleFinished();
}

SessionState.getPerfLogger().perfLogEnd(CLASS_NAME, PerfLogger.RUN_TASKS);
}

created at 2023-10-25