Chaozhong Yang created SPARK-21395: -------------------------------------- Summary: Spark SQL hive-thriftserver doesn't register operation log before execute sql statement Key: SPARK-21395 URL: https://issues.apache.org/jira/browse/SPARK-21395 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.1.1, 2.1.0 Reporter: Chaozhong Yang
In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log file with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: {code:java} @Override public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { runQuery(opConfig); } else { // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); // ThreadLocal Hive object needs to be set in background thread. // The metastore client in Hive is associated with right user. final Hive parentHive = getSessionHive(); // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @Override public void run() { PrivilegedExceptionAction<Object> doAsAction = new PrivilegedExceptionAction<Object>() { @Override public Object run() throws HiveSQLException { Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); try { runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { unregisterOperationLog(); } return null; } }; try { currentUGI.doAs(doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } finally { /** * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } } } }; try { // This submit blocks if no background threads are available to run this operation Future<?> backgroundHandle = getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected); } } } {code} Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client. But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement: {code:scala} override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { execute() } else { val sparkServiceUGI = Utils.getUGI() // Runnable impl to call runInternal asynchronously, // from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error running hive query: ", e) } } } try { sparkServiceUGI.doAs(doAsAction) } catch { case e: Exception => setOperationException(new HiveSQLException(e)) logError("Error running hive query as user : " + sparkServiceUGI.getShortUserName(), e) } } } try { // This submit blocks if no background threads are available to run this operation val backgroundHandle = parentSession.getSessionManager().submitBackgroundOperation(backgroundOperation) setBackgroundHandle(backgroundHandle) } catch { case rejected: RejectedExecutionException => setState(OperationState.ERROR) throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected) case NonFatal(e) => logError(s"Error executing query in background", e) setState(OperationState.ERROR) throw e } } } {code} LogDrivertAppender append logOutput into operation log file depends on current thread local operationLog: {code:java} @Override protected void subAppend(LoggingEvent event) { super.subAppend(event); // That should've gone into our writer. Notify the LogContext. String logOutput = writer.toString(); writer.reset(); OperationLog log = operationManager.getOperationLogByThread(); if (log == null) { LOG.debug(" ---+++=== Dropped log event from thread " + event.getThreadName()); return; } log.writeOperationLog(logOutput); } {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org