This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch branch-3.0
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-3.0 by this push:
     new cc1e568e Revert "[SPARK-31387][SQL] Handle unknown operation/session 
ID in HiveThriftServer2Listener"
cc1e568e is described below

commit cc1e568ea9cff11879a7c3eceadfe455443af3af
Author: Dongjoon Hyun <dongj...@apache.org>
AuthorDate: Thu May 21 14:20:50 2020 -0700

    Revert "[SPARK-31387][SQL] Handle unknown operation/session ID in 
HiveThriftServer2Listener"
    
    This reverts commit 5198b6853b2e3bc69fc013c653aa163c79168366.
---
 .../ui/HiveThriftServer2Listener.scala             | 106 ++++++++-------------
 .../hive/thriftserver/HiveSessionImplSuite.scala   |  73 --------------
 .../ui/HiveThriftServer2ListenerSuite.scala        |  17 ----
 .../hive/service/cli/session/HiveSessionImpl.java  |  20 ++--
 .../hive/service/cli/session/HiveSessionImpl.java  |  20 ++--
 5 files changed, 53 insertions(+), 183 deletions(-)

diff --git 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
index 6b7e5ee..6d0a506 100644
--- 
a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
+++ 
b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2Listener.scala
@@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.hive.service.server.HiveServer2
 
 import org.apache.spark.{SparkConf, SparkContext}
-import org.apache.spark.internal.Logging
 import org.apache.spark.internal.config.Status.LIVE_ENTITY_UPDATE_PERIOD
 import org.apache.spark.scheduler._
 import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.ExecutionState
@@ -39,7 +38,7 @@ private[thriftserver] class HiveThriftServer2Listener(
     kvstore: ElementTrackingStore,
     sparkConf: SparkConf,
     server: Option[HiveServer2],
-    live: Boolean = true) extends SparkListener with Logging {
+    live: Boolean = true) extends SparkListener {
 
   private val sessionList = new ConcurrentHashMap[String, LiveSessionData]()
   private val executionList = new ConcurrentHashMap[String, 
LiveExecutionData]()
@@ -132,83 +131,60 @@ private[thriftserver] class HiveThriftServer2Listener(
     updateLiveStore(session)
   }
 
-  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit 
=
-    Option(sessionList.get(e.sessionId)) match {
-      case Some(sessionData) =>
-        sessionData.finishTimestamp = e.finishTime
-        updateStoreWithTriggerEnabled(sessionData)
-        sessionList.remove(e.sessionId)
-      case None => logWarning(s"onSessionClosed called with unknown session 
id: ${e.sessionId}")
-    }
+  private def onSessionClosed(e: SparkListenerThriftServerSessionClosed): Unit 
= {
+    val session = sessionList.get(e.sessionId)
+    session.finishTimestamp = e.finishTime
+    updateStoreWithTriggerEnabled(session)
+    sessionList.remove(e.sessionId)
+  }
 
   private def onOperationStart(e: SparkListenerThriftServerOperationStart): 
Unit = {
-    val executionData = getOrCreateExecution(
+    val info = getOrCreateExecution(
       e.id,
       e.statement,
       e.sessionId,
       e.startTime,
       e.userName)
 
-    executionData.state = ExecutionState.STARTED
-    executionList.put(e.id, executionData)
-    executionData.groupId = e.groupId
-    updateLiveStore(executionData)
-
-    Option(sessionList.get(e.sessionId)) match {
-      case Some(sessionData) =>
-        sessionData.totalExecution += 1
-        updateLiveStore(sessionData)
-      case None => logWarning(s"onOperationStart called with unknown session 
id: ${e.sessionId}." +
-        s"Regardless, the operation has been registered.")
-    }
+    info.state = ExecutionState.STARTED
+    executionList.put(e.id, info)
+    sessionList.get(e.sessionId).totalExecution += 1
+    executionList.get(e.id).groupId = e.groupId
+    updateLiveStore(executionList.get(e.id))
+    updateLiveStore(sessionList.get(e.sessionId))
   }
 
-  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): 
Unit =
-    Option(executionList.get(e.id)) match {
-      case Some(executionData) =>
-        executionData.executePlan = e.executionPlan
-        executionData.state = ExecutionState.COMPILED
-        updateLiveStore(executionData)
-      case None => logWarning(s"onOperationParsed called with unknown 
operation id: ${e.id}")
-    }
+  private def onOperationParsed(e: SparkListenerThriftServerOperationParsed): 
Unit = {
+    executionList.get(e.id).executePlan = e.executionPlan
+    executionList.get(e.id).state = ExecutionState.COMPILED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationCanceled(e: 
SparkListenerThriftServerOperationCanceled): Unit =
-    Option(executionList.get(e.id)) match {
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.state = ExecutionState.CANCELED
-        updateLiveStore(executionData)
-      case None => logWarning(s"onOperationCanceled called with unknown 
operation id: ${e.id}")
-    }
+  private def onOperationCanceled(e: 
SparkListenerThriftServerOperationCanceled): Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).state = ExecutionState.CANCELED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationError(e: SparkListenerThriftServerOperationError): 
Unit =
-    Option(executionList.get(e.id)) match {
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.detail = e.errorMsg
-        executionData.state = ExecutionState.FAILED
-        updateLiveStore(executionData)
-      case None => logWarning(s"onOperationError called with unknown operation 
id: ${e.id}")
-    }
+  private def onOperationError(e: SparkListenerThriftServerOperationError): 
Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).detail = e.errorMsg
+    executionList.get(e.id).state = ExecutionState.FAILED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationFinished(e: 
SparkListenerThriftServerOperationFinish): Unit =
-    Option(executionList.get(e.id)) match {
-      case Some(executionData) =>
-        executionData.finishTimestamp = e.finishTime
-        executionData.state = ExecutionState.FINISHED
-        updateLiveStore(executionData)
-      case None => logWarning(s"onOperationFinished called with unknown 
operation id: ${e.id}")
-    }
+  private def onOperationFinished(e: 
SparkListenerThriftServerOperationFinish): Unit = {
+    executionList.get(e.id).finishTimestamp = e.finishTime
+    executionList.get(e.id).state = ExecutionState.FINISHED
+    updateLiveStore(executionList.get(e.id))
+  }
 
-  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): 
Unit =
-    Option(executionList.get(e.id)) match {
-      case Some(executionData) =>
-        executionData.closeTimestamp = e.closeTime
-        executionData.state = ExecutionState.CLOSED
-        updateStoreWithTriggerEnabled(executionData)
-        executionList.remove(e.id)
-      case None => logWarning(s"onOperationClosed called with unknown 
operation id: ${e.id}")
-    }
+  private def onOperationClosed(e: SparkListenerThriftServerOperationClosed): 
Unit = {
+    executionList.get(e.id).closeTimestamp = e.closeTime
+    executionList.get(e.id).state = ExecutionState.CLOSED
+    updateStoreWithTriggerEnabled(executionList.get(e.id))
+    executionList.remove(e.id)
+  }
 
   // Update both live and history stores. Trigger is enabled by default, hence
   // it will cleanup the entity which exceeds the threshold.
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
deleted file mode 100644
index 05d540d..0000000
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/HiveSessionImplSuite.scala
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.spark.sql.hive.thriftserver
-
-import scala.collection.JavaConverters._
-
-import org.apache.hadoop.hive.conf.HiveConf
-import org.apache.hive.service.cli.OperationHandle
-import org.apache.hive.service.cli.operation.{GetCatalogsOperation, 
OperationManager}
-import org.apache.hive.service.cli.session.{HiveSessionImpl, SessionManager}
-import org.mockito.Mockito.{mock, verify, when}
-import org.mockito.invocation.InvocationOnMock
-
-import org.apache.spark.SparkFunSuite
-
-class HiveSessionImplSuite extends SparkFunSuite {
-  private var session: HiveSessionImpl = _
-  private var operationManager: OperationManager = _
-
-  override def beforeAll() {
-    super.beforeAll()
-
-    session = new HiveSessionImpl(
-      ThriftserverShimUtils.testedProtocolVersions.head,
-      "",
-      "",
-      new HiveConf(),
-      ""
-    )
-    val sessionManager = mock(classOf[SessionManager])
-    session.setSessionManager(sessionManager)
-    operationManager = mock(classOf[OperationManager])
-    session.setOperationManager(operationManager)
-    when(operationManager.newGetCatalogsOperation(session)).thenAnswer(
-      (_: InvocationOnMock) => {
-        val operation = mock(classOf[GetCatalogsOperation])
-        when(operation.getHandle).thenReturn(mock(classOf[OperationHandle]))
-        operation
-      }
-    )
-
-    session.open(Map.empty[String, String].asJava)
-  }
-
-  test("SPARK-31387 - session.close() closes all sessions regardless of thrown 
exceptions") {
-    val operationHandle1 = session.getCatalogs
-    val operationHandle2 = session.getCatalogs
-
-    when(operationManager.closeOperation(operationHandle1))
-      .thenThrow(classOf[NullPointerException])
-    when(operationManager.closeOperation(operationHandle2))
-      .thenThrow(classOf[NullPointerException])
-
-    session.close()
-
-    verify(operationManager).closeOperation(operationHandle1)
-    verify(operationManager).closeOperation(operationHandle2)
-  }
-}
diff --git 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
index 9a9f574..075032f 100644
--- 
a/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
+++ 
b/sql/hive-thriftserver/src/test/scala/org/apache/spark/sql/hive/thriftserver/ui/HiveThriftServer2ListenerSuite.scala
@@ -140,23 +140,6 @@ class HiveThriftServer2ListenerSuite extends SparkFunSuite 
with BeforeAndAfter {
     assert(listener.noLiveData())
   }
 
-  test("SPARK-31387 - listener update methods should not throw exception with 
unknown input") {
-    val (statusStore: HiveThriftServer2AppStatusStore, listener: 
HiveThriftServer2Listener) =
-      createAppStatusStore(true)
-
-    val unknownSession = "unknown_session"
-    val unknownOperation = "unknown_operation"
-    
listener.onOtherEvent(SparkListenerThriftServerSessionClosed(unknownSession, 0))
-    listener.onOtherEvent(SparkListenerThriftServerOperationStart("id", 
unknownSession,
-      "stmt", "groupId", 0))
-    
listener.onOtherEvent(SparkListenerThriftServerOperationParsed(unknownOperation,
 "query"))
-    
listener.onOtherEvent(SparkListenerThriftServerOperationCanceled(unknownOperation,
 0))
-    
listener.onOtherEvent(SparkListenerThriftServerOperationError(unknownOperation,
-      "msg", "trace", 0))
-    
listener.onOtherEvent(SparkListenerThriftServerOperationFinish(unknownOperation,
 0))
-    
listener.onOtherEvent(SparkListenerThriftServerOperationClosed(unknownOperation,
 0))
-  }
-
   private def createProperties: Properties = {
     val properties = new Properties()
     properties.setProperty(SparkContext.SPARK_JOB_GROUP_ID, "groupId")
diff --git 
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index e3fb54d..745f385 100644
--- 
a/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/v1.2/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -636,11 +636,7 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        try {
-          operationManager.closeOperation(opHandle);
-        } catch (Exception e) {
-          LOG.warn("Exception is thrown closing operation " + opHandle, e);
-        }
+        operationManager.closeOperation(opHandle);
       }
       opHandleSet.clear();
       // Cleanup session log directory.
@@ -678,15 +674,11 @@ public class HiveSessionImpl implements HiveSession {
     File[] fileAry = new File(lScratchDir).listFiles(
             (dir, name) -> name.startsWith(sessionID) && 
name.endsWith(".pipeout"));
 
-    if (fileAry == null) {
-      LOG.error("Unable to access pipeout files in " + lScratchDir);
-    } else {
-      for (File file : fileAry) {
-        try {
-          FileUtils.forceDelete(file);
-        } catch (Exception e) {
-          LOG.error("Failed to cleanup pipeout file: " + file, e);
-        }
+    for (File file : fileAry) {
+      try {
+        FileUtils.forceDelete(file);
+      } catch (Exception e) {
+        LOG.error("Failed to cleanup pipeout file: " + file, e);
       }
     }
   }
diff --git 
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
 
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
index 1b3e8fe..14e9c47 100644
--- 
a/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
+++ 
b/sql/hive-thriftserver/v2.3/src/main/java/org/apache/hive/service/cli/session/HiveSessionImpl.java
@@ -650,11 +650,7 @@ public class HiveSessionImpl implements HiveSession {
       acquire(true);
       // Iterate through the opHandles and close their operations
       for (OperationHandle opHandle : opHandleSet) {
-        try {
-          operationManager.closeOperation(opHandle);
-        } catch (Exception e) {
-          LOG.warn("Exception is thrown closing operation " + opHandle, e);
-        }
+        operationManager.closeOperation(opHandle);
       }
       opHandleSet.clear();
       // Cleanup session log directory.
@@ -692,15 +688,11 @@ public class HiveSessionImpl implements HiveSession {
     File[] fileAry = new File(lScratchDir).listFiles(
             (dir, name) -> name.startsWith(sessionID) && 
name.endsWith(".pipeout"));
 
-    if (fileAry == null) {
-      LOG.error("Unable to access pipeout files in " + lScratchDir);
-    } else {
-      for (File file : fileAry) {
-        try {
-          FileUtils.forceDelete(file);
-        } catch (Exception e) {
-          LOG.error("Failed to cleanup pipeout file: " + file, e);
-        }
+    for (File file : fileAry) {
+      try {
+        FileUtils.forceDelete(file);
+      } catch (Exception e) {
+        LOG.error("Failed to cleanup pipeout file: " + file, e);
       }
     }
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to