This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch branch-3.0 in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/branch-3.0 by this push: new cc1e568e Revert "[SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener" cc1e568e is described below commit cc1e568ea9cff11879a7c3eceadfe455443af3af Author: Dongjoon Hyun <dongj...@apache.org> AuthorDate: Thu May 21 14:20:50 2020 -0700 Revert "[SPARK-31387][SQL] Handle unknown operation/session ID in HiveThriftServer2Listener" This reverts commit 5198b6853b2e3bc69fc013c653aa163c79168366. --- .../ui/HiveThriftServer2Listener.scala | 106 ++++++++------------- .../hive/thriftserver/HiveSessionImplSuite.scala | 73 -------------- .../ui/HiveThriftServer2ListenerSuite.scala | 17 ---- .../hive/service/cli/session/HiveSessionImpl.java | 20 ++-- .../hive/service/cli/session/HiveSessionImpl.java | 20 ++-- 5 files changed, 53 insertions(+), 183 deletions(-) diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala index 6b7e5ee..6d0a506 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer import org.apache.hive.service.server.HiveServer2 import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.internal.Logging import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD import org.apache.spark.scheduler._ import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState @@ -39,7 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener( kvstore: ElementTrackingStore, sparkConf: SparkConf, server: Option[HiveServer2], - live: Boolean = true) extends SparkListener with Logging { + live: Boolean = true) extends SparkListener { private val sessionList = new ConcurrentHashMap[String, LiveSessionData]() private val executionList = new ConcurrentHashMap[String, LiveExecutionData]() @@ -132,83 +131,60 @@ private[thriftserver] class HiveThriftServer2Listener( updateLiveStore(session) } - private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = - Option(sessionList.get(e.sessionId)) match { - case Some(sessionData) => - sessionData.finishTimestamp = e.finishTime - updateStoreWithTriggerEnabled(sessionData) - sessionList.remove(e.sessionId) - case None => logWarning(s"onSessionClosed called with unknown session id: ${e.sessionId}") - } + private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit = { + val session = sessionList.get(e.sessionId) + session.finishTimestamp = e.finishTime + updateStoreWithTriggerEnabled(session) + sessionList.remove(e.sessionId) + } private def onOperationStart(e: SparkListenerThriftServerOperationStart): Unit = { - val executionData = getOrCreateExecution( + val info = getOrCreateExecution( e.id, e.statement, e.sessionId, e.startTime, e.userName) - executionData.state = ExecutionState.STARTED - executionList.put(e.id, executionData) - executionData.groupId = e.groupId - updateLiveStore(executionData) - - Option(sessionList.get(e.sessionId)) match { - case Some(sessionData) => - sessionData.totalExecution += 1 - updateLiveStore(sessionData) - case None => logWarning(s"onOperationStart called with unknown session id: ${e.sessionId}." + - s"Regardless, the operation has been registered.") - } + info.state = ExecutionState.STARTED + executionList.put(e.id, info) + sessionList.get(e.sessionId).totalExecution += 1 + executionList.get(e.id).groupId = e.groupId + updateLiveStore(executionList.get(e.id)) + updateLiveStore(sessionList.get(e.sessionId)) } - private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = - Option(executionList.get(e.id)) match { - case Some(executionData) => - executionData.executePlan = e.executionPlan - executionData.state = ExecutionState.COMPILED - updateLiveStore(executionData) - case None => logWarning(s"onOperationParsed called with unknown operation id: ${e.id}") - } + private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): Unit = { + executionList.get(e.id).executePlan = e.executionPlan + executionList.get(e.id).state = ExecutionState.COMPILED + updateLiveStore(executionList.get(e.id)) + } - private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = - Option(executionList.get(e.id)) match { - case Some(executionData) => - executionData.finishTimestamp = e.finishTime - executionData.state = ExecutionState.CANCELED - updateLiveStore(executionData) - case None => logWarning(s"onOperationCanceled called with unknown operation id: ${e.id}") - } + private def onOperationCanceled(e: SparkListenerThriftServerOperationCanceled): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).state = ExecutionState.CANCELED + updateLiveStore(executionList.get(e.id)) + } - private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = - Option(executionList.get(e.id)) match { - case Some(executionData) => - executionData.finishTimestamp = e.finishTime - executionData.detail = e.errorMsg - executionData.state = ExecutionState.FAILED - updateLiveStore(executionData) - case None => logWarning(s"onOperationError called with unknown operation id: ${e.id}") - } + private def onOperationError(e: SparkListenerThriftServerOperationError): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).detail = e.errorMsg + executionList.get(e.id).state = ExecutionState.FAILED + updateLiveStore(executionList.get(e.id)) + } - private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = - Option(executionList.get(e.id)) match { - case Some(executionData) => - executionData.finishTimestamp = e.finishTime - executionData.state = ExecutionState.FINISHED - updateLiveStore(executionData) - case None => logWarning(s"onOperationFinished called with unknown operation id: ${e.id}") - } + private def onOperationFinished(e: SparkListenerThriftServerOperationFinish): Unit = { + executionList.get(e.id).finishTimestamp = e.finishTime + executionList.get(e.id).state = ExecutionState.FINISHED + updateLiveStore(executionList.get(e.id)) + } - private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = - Option(executionList.get(e.id)) match { - case Some(executionData) => - executionData.closeTimestamp = e.closeTime - executionData.state = ExecutionState.CLOSED - updateStoreWithTriggerEnabled(executionData) - executionList.remove(e.id) - case None => logWarning(s"onOperationClosed called with unknown operation id: ${e.id}") - } + private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): Unit = { + executionList.get(e.id).closeTimestamp = e.closeTime + executionList.get(e.id).state = ExecutionState.CLOSED + updateStoreWithTriggerEnabled(executionList.get(e.id)) + executionList.remove(e.id) + } // Update both live and history stores. Trigger is enabled by default, hence // it will cleanup the entity which exceeds the threshold. diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala deleted file mode 100644 index 05d540d..0000000 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala +++ /dev/null @@ -1,73 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.hive.thriftserver - -import scala.collection.JavaConverters._ - -import org.apache.hadoop.hive.conf.HiveConf -import org.apache.hive.service.cli.OperationHandle -import org.apache.hive.service.cli.operation.{GetCatalogsOperation, OperationManager} -import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager} -import org.mockito.Mockito.{mock, verify, when} -import org.mockito.invocation.InvocationOnMock - -import org.apache.spark.SparkFunSuite - -class HiveSessionImplSuite extends SparkFunSuite { - private var session: HiveSessionImpl = _ - private var operationManager: OperationManager = _ - - override def beforeAll() { - super.beforeAll() - - session = new HiveSessionImpl( - ThriftserverShimUtils.testedProtocolVersions.head, - "", - "", - new HiveConf(), - "" - ) - val sessionManager = mock(classOf[SessionManager]) - session.setSessionManager(sessionManager) - operationManager = mock(classOf[OperationManager]) - session.setOperationManager(operationManager) - when(operationManager.newGetCatalogsOperation(session)).thenAnswer( - (_: InvocationOnMock) => { - val operation = mock(classOf[GetCatalogsOperation]) - when(operation.getHandle).thenReturn(mock(classOf[OperationHandle])) - operation - } - ) - - session.open(Map.empty[String, String].asJava) - } - - test("SPARK-31387 - session.close() closes all sessions regardless of thrown exceptions") { - val operationHandle1 = session.getCatalogs - val operationHandle2 = session.getCatalogs - - when(operationManager.closeOperation(operationHandle1)) - .thenThrow(classOf[NullPointerException]) - when(operationManager.closeOperation(operationHandle2)) - .thenThrow(classOf[NullPointerException]) - - session.close() - - verify(operationManager).closeOperation(operationHandle1) - verify(operationManager).closeOperation(operationHandle2) - } -} diff --git a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala index 9a9f574..075032f 100644 --- a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala +++ b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala @@ -140,23 +140,6 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite with BeforeAndAfter { assert(listener.noLiveData()) } - test("SPARK-31387 - listener update methods should not throw exception with unknown input") { - val (statusStore: HiveThriftServer2AppStatusStore, listener: HiveThriftServer2Listener) = - createAppStatusStore(true) - - val unknownSession = "unknown_session" - val unknownOperation = "unknown_operation" - listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0)) - listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", unknownSession, - "stmt", "groupId", 0)) - listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation, "query")) - listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation, 0)) - listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation, - "msg", "trace", 0)) - listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation, 0)) - listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation, 0)) - } - private def createProperties: Properties = { val properties = new Properties() properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId") diff --git a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index e3fb54d..745f385 100644 --- a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -636,11 +636,7 @@ public class HiveSessionImpl implements HiveSession { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - try { - operationManager.closeOperation(opHandle); - } catch (Exception e) { - LOG.warn("Exception is thrown closing operation " + opHandle, e); - } + operationManager.closeOperation(opHandle); } opHandleSet.clear(); // Cleanup session log directory. @@ -678,15 +674,11 @@ public class HiveSessionImpl implements HiveSession { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - if (fileAry == null) { - LOG.error("Unable to access pipeout files in " + lScratchDir); - } else { - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); - } + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); } } } diff --git a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java index 1b3e8fe..14e9c47 100644 --- a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java +++ b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java @@ -650,11 +650,7 @@ public class HiveSessionImpl implements HiveSession { acquire(true); // Iterate through the opHandles and close their operations for (OperationHandle opHandle : opHandleSet) { - try { - operationManager.closeOperation(opHandle); - } catch (Exception e) { - LOG.warn("Exception is thrown closing operation " + opHandle, e); - } + operationManager.closeOperation(opHandle); } opHandleSet.clear(); // Cleanup session log directory. @@ -692,15 +688,11 @@ public class HiveSessionImpl implements HiveSession { File[] fileAry = new File(lScratchDir).listFiles( (dir, name) -> name.startsWith(sessionID) && name.endsWith(".pipeout")); - if (fileAry == null) { - LOG.error("Unable to access pipeout files in " + lScratchDir); - } else { - for (File file : fileAry) { - try { - FileUtils.forceDelete(file); - } catch (Exception e) { - LOG.error("Failed to cleanup pipeout file: " + file, e); - } + for (File file : fileAry) { + try { + FileUtils.forceDelete(file); + } catch (Exception e) { + LOG.error("Failed to cleanup pipeout file: " + file, e); } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org