This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 783c7bc0092a [SPARK-46184][CORE][SQL][CONNECT][MLLIB] Reduce stack 
depth by replace Option.isDefined with Option.isEmpty
783c7bc0092a is described below

commit 783c7bc0092a88d7caa6ed15a97b9649ff8a1ec6
Author: Jiaan Geng <belie...@163.com>
AuthorDate: Thu Nov 30 17:57:22 2023 -0800

    [SPARK-46184][CORE][SQL][CONNECT][MLLIB] Reduce stack depth by replace 
Option.isDefined with Option.isEmpty
    
    ### What changes were proposed in this pull request?
    There are a lot of `Option.isDefined` used with `!`, this behavior increase 
the stack length.
    The reason is `isDefined` calls `isEmpty`.
    `final def isDefined: Boolean = !isEmpty`
    We also get the compile waring `Replace with . isEmpty`.
    
    ### Why are the changes needed?
    Reduce stack depth by replace `Option.isDefined` with `Option.isEmpty`.
    
    ### Does this PR introduce _any_ user-facing change?
    'No'.
    
    ### How was this patch tested?
    Exists test cases.
    
    ### Was this patch authored or co-authored using generative AI tooling?
    'No'.
    
    Closes #44091 from beliefer/SPARK-46184.
    
    Authored-by: Jiaan Geng <belie...@163.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../org/apache/spark/sql/connect/service/ExecuteEventsManager.scala | 4 +---
 .../org/apache/spark/sql/connect/service/SessionEventsManager.scala | 4 +---
 .../spark/sql/connect/service/SparkConnectExecutionManager.scala    | 2 +-
 .../org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala  | 2 +-
 .../org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 6 +++---
 .../main/scala/org/apache/spark/deploy/history/HistoryServer.scala  | 2 +-
 .../src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 4 ++--
 core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala        | 2 +-
 core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala  | 2 +-
 core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala | 2 +-
 core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala   | 2 +-
 mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala  | 2 +-
 .../apache/spark/sql/catalyst/expressions/codegen/javaCode.scala    | 2 +-
 .../org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala      | 2 +-
 .../src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala  | 2 +-
 .../scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala  | 2 +-
 .../streaming/sources/RatePerMicroBatchProviderSuite.scala          | 2 +-
 .../scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala  | 2 +-
 .../org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala    | 2 +-
 19 files changed, 22 insertions(+), 26 deletions(-)

diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
index 2430716069cc..760e3065e1e8 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
@@ -256,9 +256,7 @@ case class ExecuteEventsManager(executeHolder: 
ExecuteHolder, clock: Clock) {
   private def assertStatus(
       validStatuses: List[ExecuteStatus],
       eventStatus: ExecuteStatus): Unit = {
-    if (!validStatuses
-        .find(s => s == status)
-        .isDefined) {
+    if (validStatuses.find(s => s == status).isEmpty) {
       throw new IllegalStateException(s"""
         operationId: $operationId with status ${status}
         is not within statuses $validStatuses for event $eventStatus
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
index f275fab56bf5..0a466594fad7 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala
@@ -81,9 +81,7 @@ case class SessionEventsManager(sessionHolder: SessionHolder, 
clock: Clock) {
   private def assertStatus(
       validStatuses: List[SessionStatus],
       eventStatus: SessionStatus): Unit = {
-    if (!validStatuses
-        .find(s => s == status)
-        .isDefined) {
+    if (validStatuses.find(s => s == status).isEmpty) {
       throw new IllegalStateException(s"""
         sessionId: $sessionId with status ${status}
         is not within statuses $validStatuses for event $eventStatus
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index 36c6f73329b8..5a9d0136de34 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -164,7 +164,7 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     scheduledExecutor = None
     executions.clear()
     abandonedTombstones.invalidateAll()
-    if (!lastExecutionTime.isDefined) {
+    if (lastExecutionTime.isEmpty) {
       lastExecutionTime = Some(System.currentTimeMillis())
     }
   }
diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
index 1b1e0f156bba..2bd883a3cd51 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala
@@ -584,7 +584,7 @@ private[kafka010] class KafkaDataConsumer(
   }
 
   private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = {
-    if (!_consumer.isDefined) {
+    if (_consumer.isEmpty) {
       retrieveConsumer()
     }
     require(_consumer.isDefined, "Consumer must be defined")
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index edbbf1cc6a76..27904ebb3c5e 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -189,7 +189,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
 
   private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
     eventually(Timeout(streamingTimeout)) {
-      if (!q.exception.isDefined) {
+      if (q.exception.isEmpty) {
         assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
       }
     }
@@ -1295,7 +1295,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
     // ensure all batches we are waiting for have been processed.
     val waitUntilBatchProcessed = Execute { q =>
       eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
+        if (q.exception.isEmpty) {
           assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
         }
       }
@@ -1418,7 +1418,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase with
     // ensure all batches we are waiting for have been processed.
     val waitUntilBatchProcessed = Execute { q =>
       eventually(Timeout(streamingTimeout)) {
-        if (!q.exception.isDefined) {
+        if (q.exception.isEmpty) {
           assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
         }
       }
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala 
b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
index 875e34041d3b..321f76923411 100644
--- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala
@@ -101,7 +101,7 @@ class HistoryServer(
       // Since we may have applications with multiple attempts mixed with 
applications with a
       // single attempt, we need to try both. Try the single-attempt route 
first, and if an
       // error is raised, then try the multiple attempt route.
-      if (!loadAppUi(appId, None) && (!attemptId.isDefined || 
!loadAppUi(appId, attemptId))) {
+      if (!loadAppUi(appId, None) && (attemptId.isEmpty || !loadAppUi(appId, 
attemptId))) {
         val msg = <div class="row">Application {appId} not found.</div>
         res.setStatus(HttpServletResponse.SC_NOT_FOUND)
         UIUtils.basicSparkPage(req, msg, "Not Found").foreach { n =>
diff --git 
a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala 
b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
index 13460954c061..ec925d1341c9 100644
--- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
+++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala
@@ -42,11 +42,11 @@ private[spark] class GraphiteSink(
 
   def propertyToOption(prop: String): Option[String] = 
Option(property.getProperty(prop))
 
-  if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) {
+  if (propertyToOption(GRAPHITE_KEY_HOST).isEmpty) {
     throw SparkCoreErrors.graphiteSinkPropertyMissingError("host")
   }
 
-  if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) {
+  if (propertyToOption(GRAPHITE_KEY_PORT).isEmpty) {
     throw SparkCoreErrors.graphiteSinkPropertyMissingError("port")
   }
 
diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala 
b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
index 976516dba40f..d303510a7e0b 100644
--- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
+++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala
@@ -357,7 +357,7 @@ private[ui] class StagePage(parent: StagesTab, store: 
AppStatusStore) extends We
                  |Status: ${taskInfo.status}<br>
                  |Launch Time: ${UIUtils.formatDate(new Date(launchTime))}
                  |${
-                     if (!taskInfo.duration.isDefined) {
+                     if (taskInfo.duration.isEmpty) {
                        s"""<br>Finish Time: ${UIUtils.formatDate(new 
Date(finishTime))}"""
                      } else {
                         ""
diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala 
b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
index 553d001285b2..e235b8aeb778 100644
--- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
+++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala
@@ -697,7 +697,7 @@ class SparkSubmitSuite
     var sc2: SparkContext = null
     try {
       sc2 = new SparkContext(conf2)
-      assert(!sc2.progressBar.isDefined)
+      assert(sc2.progressBar.isEmpty)
     } finally {
       if (sc2 != null) {
         sc2.stop()
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
index 591b8b4c0df7..9768da09a9f8 100644
--- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala
@@ -70,7 +70,7 @@ class LocalCheckpointSuite extends SparkFunSuite with 
LocalSparkContext {
     filteredRdd.localCheckpoint()
     assert(filteredRdd.checkpointData.isDefined)
     assert(!filteredRdd.checkpointData.get.isCheckpointed)
-    assert(!filteredRdd.checkpointData.get.checkpointRDD.isDefined)
+    assert(filteredRdd.checkpointData.get.checkpointRDD.isEmpty)
     assert(filteredRdd.getStorageLevel === 
LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL)
 
     // After an action, the lineage is truncated
diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala 
b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
index 948bc8889bcd..3eae1b3278e7 100644
--- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala
@@ -1292,7 +1292,7 @@ private[spark] object JsonProtocolSuite extends 
Assertions {
       assert(opt2.isDefined)
       assertEquals(opt1.get, opt2.get)
     } else {
-      assert(!opt2.isDefined)
+      assert(opt2.isEmpty)
     }
   }
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
index c791edb9de15..bd2d08c0d79e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala
@@ -162,7 +162,7 @@ class Interaction @Since("1.6.0") (@Since("1.6.0") override 
val uid: String) ext
           val attr = Attribute.decodeStructField(f, preserveName = true)
           if (attr == UnresolvedAttribute) {
             
encodedFeatureAttrs(Seq(NumericAttribute.defaultAttr.withName(f.name)), None)
-          } else if (!attr.name.isDefined) {
+          } else if (attr.name.isEmpty) {
             encodedFeatureAttrs(Seq(attr.withName(f.name)), None)
           } else {
             encodedFeatureAttrs(Seq(attr), None)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
index d5226fef3c73..bfa828ae7a6c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala
@@ -176,7 +176,7 @@ trait Block extends TreeNode[Block] with JavaCode {
 
     @inline def transform(e: ExprValue): ExprValue = {
       val newE = f lift e
-      if (!newE.isDefined || newE.get.equals(e)) {
+      if (newE.isEmpty || newE.get.equals(e)) {
         e
       } else {
         changed = true
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
index c170d34e1700..d71fab817df1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala
@@ -214,7 +214,7 @@ object RuleIdCollection {
   def getRuleId(ruleName: String): RuleId = {
     val ruleIdOpt = ruleToId.get(ruleName)
     // Please add the rule name to `rulesWithIds` if rule id is not found.
-    if (!ruleIdOpt.isDefined) {
+    if (ruleIdOpt.isEmpty) {
       throw QueryExecutionErrors.ruleIdNotFoundForRuleError(ruleName)
     }
     ruleIdOpt.get
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala
index d61ede780a74..5c4de5d4d963 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala
@@ -84,6 +84,6 @@ class UDTRegistrationSuite extends SparkFunSuite {
 
   test("query unregistered user class") {
     assert(!UDTRegistration.exists(classOf[TestUserClass3].getName))
-    
assert(!UDTRegistration.getUDTFor(classOf[TestUserClass3].getName).isDefined)
+    assert(UDTRegistration.getUDTFor(classOf[TestUserClass3].getName).isEmpty)
   }
 }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
index 263006100bea..5f68f235485e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala
@@ -206,7 +206,7 @@ class ArtifactManagerSuite extends SharedSparkSession {
         // Remove resources
         artifactManager.cleanUpResources()
 
-        assert(!blockManager.getLocalBytes(blockId).isDefined)
+        assert(blockManager.getLocalBytes(blockId).isEmpty)
         assert(!expectedPath.toFile.exists())
       } finally {
         try {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
index 01599bb92869..31fbf9323140 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala
@@ -130,7 +130,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest {
 
   private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
     eventually(Timeout(streamingTimeout)) {
-      if (!q.exception.isDefined) {
+      if (q.exception.isEmpty) {
         assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
index 116f5cdec817..a265bd9339b8 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala
@@ -756,7 +756,7 @@ class StreamingQuerySuite extends StreamTest with 
BeforeAndAfter with Logging wi
 
   private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
     eventually(Timeout(streamingTimeout)) {
-      if (!q.exception.isDefined) {
+      if (q.exception.isEmpty) {
         assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
       }
     }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
index 8f099c31e6b4..44c909d8ae2d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala
@@ -144,7 +144,7 @@ class StreamingSelfUnionSuite extends StreamTest with 
BeforeAndAfter {
 
   private def waitUntilBatchProcessed(clock: StreamManualClock) = 
AssertOnQuery { q =>
     eventually(Timeout(streamingTimeout)) {
-      if (!q.exception.isDefined) {
+      if (q.exception.isEmpty) {
         assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
       }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to