[jira] [Updated] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Wenchen Fan updated SPARK-21395: Target Version/s: (was: 2.4.0) > Spark SQL hive-thriftserver doesn't register operation log before execute sql > statement > --- > > Key: SPARK-21395 > URL: https://issues.apache.org/jira/browse/SPARK-21395 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chaozhong Yang >Priority: Major > > In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. > If fetchType is equal to be `1`, the thrift server should return operation > log to client. However, we found Spark SQL's thrift server always return > nothing to client for TFetchResultsReq with fetchType(1). We > have checked the > ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory > carefully and found that there were existed operation log files with zero > bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: > {code:java} > @Override > public void runInternal() throws HiveSQLException { > setState(OperationState.PENDING); > final HiveConf opConfig = getConfigForOperation(); > prepare(opConfig); > if (!shouldRunAsync()) { > runQuery(opConfig); > } else { > // We'll pass ThreadLocals in the background thread from the foreground > (handler) thread > final SessionState parentSessionState = SessionState.get(); > // ThreadLocal Hive object needs to be set in background thread. > // The metastore client in Hive is associated with right user. > final Hive parentHive = getSessionHive(); > // Current UGI will get used by metastore when metsatore is in embedded > mode > // So this needs to get passed to the new background thread > final UserGroupInformation currentUGI = getCurrentUGI(opConfig); > // Runnable impl to call runInternal asynchronously, > // from a different thread > Runnable backgroundOperation = new Runnable() { > @Override > public void run() { > PrivilegedExceptionAction doAsAction = new > PrivilegedExceptionAction() { > @Override > public Object run() throws HiveSQLException { > Hive.set(parentHive); > SessionState.setCurrentSessionState(parentSessionState); > // Set current OperationLog in this async thread for keeping on > saving query log. > registerCurrentOperationLog(); > try { > runQuery(opConfig); > } catch (HiveSQLException e) { > setOperationException(e); > LOG.error("Error running hive query: ", e); > } finally { > unregisterOperationLog(); > } > return null; > } > }; > try { > currentUGI.doAs(doAsAction); > } catch (Exception e) { > setOperationException(new HiveSQLException(e)); > LOG.error("Error running hive query as user : " + > currentUGI.getShortUserName(), e); > } > finally { > /** > * We'll cache the ThreadLocal RawStore object for this > background thread for an orderly cleanup > * when this thread is garbage collected later. > * @see > org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() > */ > if (ThreadWithGarbageCleanup.currentThread() instanceof > ThreadWithGarbageCleanup) { > ThreadWithGarbageCleanup currentThread = > (ThreadWithGarbageCleanup) > ThreadWithGarbageCleanup.currentThread(); > currentThread.cacheThreadLocalRawStore(); > } > } > } > }; > try { > // This submit blocks if no background threads are available to run > this operation > Future backgroundHandle = > > getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); > setBackgroundHandle(backgroundHandle); > } catch (RejectedExecutionException rejected) { > setState(OperationState.ERROR); > throw new HiveSQLException("The background threadpool cannot accept" + > " new task for execution, please retry the operation", rejected); > } > } > } > {code} > Obviously, registerOperationLog is the key point that Hive can produce and > return operation log to client. > But, in Spark SQL, SparkExecuteStatementOperation doesn't > registerOperationLog before execute sql statement: > {code:scala} > override def runInternal(): Unit = { > setState(OperationState.PENDING
[jira] [Updated] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sameer Agarwal updated SPARK-21395: --- Target Version/s: 2.4.0 (was: 2.3.0) > Spark SQL hive-thriftserver doesn't register operation log before execute sql > statement > --- > > Key: SPARK-21395 > URL: https://issues.apache.org/jira/browse/SPARK-21395 > Project: Spark > Issue Type: Bug > Components: SQL >Affects Versions: 2.1.0, 2.1.1 >Reporter: Chaozhong Yang > > In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. > If fetchType is equal to be `1`, the thrift server should return operation > log to client. However, we found Spark SQL's thrift server always return > nothing to client for TFetchResultsReq with fetchType(1). We > have checked the > ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory > carefully and found that there were existed operation log files with zero > bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: > {code:java} > @Override > public void runInternal() throws HiveSQLException { > setState(OperationState.PENDING); > final HiveConf opConfig = getConfigForOperation(); > prepare(opConfig); > if (!shouldRunAsync()) { > runQuery(opConfig); > } else { > // We'll pass ThreadLocals in the background thread from the foreground > (handler) thread > final SessionState parentSessionState = SessionState.get(); > // ThreadLocal Hive object needs to be set in background thread. > // The metastore client in Hive is associated with right user. > final Hive parentHive = getSessionHive(); > // Current UGI will get used by metastore when metsatore is in embedded > mode > // So this needs to get passed to the new background thread > final UserGroupInformation currentUGI = getCurrentUGI(opConfig); > // Runnable impl to call runInternal asynchronously, > // from a different thread > Runnable backgroundOperation = new Runnable() { > @Override > public void run() { > PrivilegedExceptionAction doAsAction = new > PrivilegedExceptionAction() { > @Override > public Object run() throws HiveSQLException { > Hive.set(parentHive); > SessionState.setCurrentSessionState(parentSessionState); > // Set current OperationLog in this async thread for keeping on > saving query log. > registerCurrentOperationLog(); > try { > runQuery(opConfig); > } catch (HiveSQLException e) { > setOperationException(e); > LOG.error("Error running hive query: ", e); > } finally { > unregisterOperationLog(); > } > return null; > } > }; > try { > currentUGI.doAs(doAsAction); > } catch (Exception e) { > setOperationException(new HiveSQLException(e)); > LOG.error("Error running hive query as user : " + > currentUGI.getShortUserName(), e); > } > finally { > /** > * We'll cache the ThreadLocal RawStore object for this > background thread for an orderly cleanup > * when this thread is garbage collected later. > * @see > org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() > */ > if (ThreadWithGarbageCleanup.currentThread() instanceof > ThreadWithGarbageCleanup) { > ThreadWithGarbageCleanup currentThread = > (ThreadWithGarbageCleanup) > ThreadWithGarbageCleanup.currentThread(); > currentThread.cacheThreadLocalRawStore(); > } > } > } > }; > try { > // This submit blocks if no background threads are available to run > this operation > Future backgroundHandle = > > getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); > setBackgroundHandle(backgroundHandle); > } catch (RejectedExecutionException rejected) { > setState(OperationState.ERROR); > throw new HiveSQLException("The background threadpool cannot accept" + > " new task for execution, please retry the operation", rejected); > } > } > } > {code} > Obviously, registerOperationLog is the key point that Hive can produce and > return operation log to client. > But, in Spark SQL, SparkExecuteStatementOperation doesn't > registerOperationLog before execute sql statement: > {code:scala} > override def runInternal(): Unit = { > setState(OperationState.PENDING) > setHasResul
[jira] [Updated] (SPARK-21395) Spark SQL hive-thriftserver doesn't register operation log before execute sql statement
[ https://issues.apache.org/jira/browse/SPARK-21395?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chaozhong Yang updated SPARK-21395: --- Description: In HiveServer2, TFetchResultsReq has a member which is named as `fetchType`. If fetchType is equal to be `1`, the thrift server should return operation log to client. However, we found Spark SQL's thrift server always return nothing to client for TFetchResultsReq with fetchType(1). We have checked the ${HIVE_SERVER2_LOGGING_OPERATION_LOG_LOCATION}/${session-id} directory carefully and found that there were existed operation log files with zero bytes(empty file). Why? Let's take a look at SQLOperation.java in Hive: {code:java} @Override public void runInternal() throws HiveSQLException { setState(OperationState.PENDING); final HiveConf opConfig = getConfigForOperation(); prepare(opConfig); if (!shouldRunAsync()) { runQuery(opConfig); } else { // We'll pass ThreadLocals in the background thread from the foreground (handler) thread final SessionState parentSessionState = SessionState.get(); // ThreadLocal Hive object needs to be set in background thread. // The metastore client in Hive is associated with right user. final Hive parentHive = getSessionHive(); // Current UGI will get used by metastore when metsatore is in embedded mode // So this needs to get passed to the new background thread final UserGroupInformation currentUGI = getCurrentUGI(opConfig); // Runnable impl to call runInternal asynchronously, // from a different thread Runnable backgroundOperation = new Runnable() { @Override public void run() { PrivilegedExceptionAction doAsAction = new PrivilegedExceptionAction() { @Override public Object run() throws HiveSQLException { Hive.set(parentHive); SessionState.setCurrentSessionState(parentSessionState); // Set current OperationLog in this async thread for keeping on saving query log. registerCurrentOperationLog(); try { runQuery(opConfig); } catch (HiveSQLException e) { setOperationException(e); LOG.error("Error running hive query: ", e); } finally { unregisterOperationLog(); } return null; } }; try { currentUGI.doAs(doAsAction); } catch (Exception e) { setOperationException(new HiveSQLException(e)); LOG.error("Error running hive query as user : " + currentUGI.getShortUserName(), e); } finally { /** * We'll cache the ThreadLocal RawStore object for this background thread for an orderly cleanup * when this thread is garbage collected later. * @see org.apache.hive.service.server.ThreadWithGarbageCleanup#finalize() */ if (ThreadWithGarbageCleanup.currentThread() instanceof ThreadWithGarbageCleanup) { ThreadWithGarbageCleanup currentThread = (ThreadWithGarbageCleanup) ThreadWithGarbageCleanup.currentThread(); currentThread.cacheThreadLocalRawStore(); } } } }; try { // This submit blocks if no background threads are available to run this operation Future backgroundHandle = getParentSession().getSessionManager().submitBackgroundOperation(backgroundOperation); setBackgroundHandle(backgroundHandle); } catch (RejectedExecutionException rejected) { setState(OperationState.ERROR); throw new HiveSQLException("The background threadpool cannot accept" + " new task for execution, please retry the operation", rejected); } } } {code} Obviously, registerOperationLog is the key point that Hive can produce and return operation log to client. But, in Spark SQL, SparkExecuteStatementOperation doesn't registerOperationLog before execute sql statement: {code:scala} override def runInternal(): Unit = { setState(OperationState.PENDING) setHasResultSet(true) // avoid no resultset for async run if (!runInBackground) { execute() } else { val sparkServiceUGI = Utils.getUGI() // Runnable impl to call runInternal asynchronously, // from a different thread val backgroundOperation = new Runnable() { override def run(): Unit = { val doAsAction = new PrivilegedExceptionAction[Unit]() { override def run(): Unit = { try { execute() } catch { case e: HiveSQLException => setOperationException(e) log.error("Error running hive query: ", e) }