This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 783c7bc0092a [SPARK-46184][CORE][SQL][CONNECT][MLLIB] Reduce stack depth by replace Option.isDefined with Option.isEmpty 783c7bc0092a is described below commit 783c7bc0092a88d7caa6ed15a97b9649ff8a1ec6 Author: Jiaan Geng <belie...@163.com> AuthorDate: Thu Nov 30 17:57:22 2023 -0800 [SPARK-46184][CORE][SQL][CONNECT][MLLIB] Reduce stack depth by replace Option.isDefined with Option.isEmpty ### What changes were proposed in this pull request? There are a lot of `Option.isDefined` used with `!`, this behavior increase the stack length. The reason is `isDefined` calls `isEmpty`. `final def isDefined: Boolean = !isEmpty` We also get the compile waring `Replace with . isEmpty`. ### Why are the changes needed? Reduce stack depth by replace `Option.isDefined` with `Option.isEmpty`. ### Does this PR introduce _any_ user-facing change? 'No'. ### How was this patch tested? Exists test cases. ### Was this patch authored or co-authored using generative AI tooling? 'No'. Closes #44091 from beliefer/SPARK-46184. Authored-by: Jiaan Geng <belie...@163.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../org/apache/spark/sql/connect/service/ExecuteEventsManager.scala | 4 +--- .../org/apache/spark/sql/connect/service/SessionEventsManager.scala | 4 +--- .../spark/sql/connect/service/SparkConnectExecutionManager.scala | 2 +- .../org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala | 2 +- .../org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala | 6 +++--- .../main/scala/org/apache/spark/deploy/history/HistoryServer.scala | 2 +- .../src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala | 4 ++-- core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala | 2 +- core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala | 2 +- core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala | 2 +- core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala | 2 +- mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala | 2 +- .../apache/spark/sql/catalyst/expressions/codegen/javaCode.scala | 2 +- .../org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala | 2 +- .../src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala | 2 +- .../scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala | 2 +- .../streaming/sources/RatePerMicroBatchProviderSuite.scala | 2 +- .../scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala | 2 +- .../org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala | 2 +- 19 files changed, 22 insertions(+), 26 deletions(-) diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala index 2430716069cc..760e3065e1e8 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala @@ -256,9 +256,7 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { private def assertStatus( validStatuses: List[ExecuteStatus], eventStatus: ExecuteStatus): Unit = { - if (!validStatuses - .find(s => s == status) - .isDefined) { + if (validStatuses.find(s => s == status).isEmpty) { throw new IllegalStateException(s""" operationId: $operationId with status ${status} is not within statuses $validStatuses for event $eventStatus diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala index f275fab56bf5..0a466594fad7 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionEventsManager.scala @@ -81,9 +81,7 @@ case class SessionEventsManager(sessionHolder: SessionHolder, clock: Clock) { private def assertStatus( validStatuses: List[SessionStatus], eventStatus: SessionStatus): Unit = { - if (!validStatuses - .find(s => s == status) - .isDefined) { + if (validStatuses.find(s => s == status).isEmpty) { throw new IllegalStateException(s""" sessionId: $sessionId with status ${status} is not within statuses $validStatuses for event $eventStatus diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index 36c6f73329b8..5a9d0136de34 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -164,7 +164,7 @@ private[connect] class SparkConnectExecutionManager() extends Logging { scheduledExecutor = None executions.clear() abandonedTombstones.invalidateAll() - if (!lastExecutionTime.isDefined) { + if (lastExecutionTime.isEmpty) { lastExecutionTime = Some(System.currentTimeMillis()) } } diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala index 1b1e0f156bba..2bd883a3cd51 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/consumer/KafkaDataConsumer.scala @@ -584,7 +584,7 @@ private[kafka010] class KafkaDataConsumer( } private[kafka010] def getOrRetrieveConsumer(): InternalKafkaConsumer = { - if (!_consumer.isDefined) { + if (_consumer.isEmpty) { retrieveConsumer() } require(_consumer.isDefined, "Consumer must be defined") diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index edbbf1cc6a76..27904ebb3c5e 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -189,7 +189,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } @@ -1295,7 +1295,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with // ensure all batches we are waiting for have been processed. val waitUntilBatchProcessed = Execute { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } @@ -1418,7 +1418,7 @@ abstract class KafkaMicroBatchSourceSuiteBase extends KafkaSourceSuiteBase with // ensure all batches we are waiting for have been processed. val waitUntilBatchProcessed = Execute { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } diff --git a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala index 875e34041d3b..321f76923411 100644 --- a/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala +++ b/core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala @@ -101,7 +101,7 @@ class HistoryServer( // Since we may have applications with multiple attempts mixed with applications with a // single attempt, we need to try both. Try the single-attempt route first, and if an // error is raised, then try the multiple attempt route. - if (!loadAppUi(appId, None) && (!attemptId.isDefined || !loadAppUi(appId, attemptId))) { + if (!loadAppUi(appId, None) && (attemptId.isEmpty || !loadAppUi(appId, attemptId))) { val msg = <div class="row">Application {appId} not found.</div> res.setStatus(HttpServletResponse.SC_NOT_FOUND) UIUtils.basicSparkPage(req, msg, "Not Found").foreach { n => diff --git a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala index 13460954c061..ec925d1341c9 100644 --- a/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala +++ b/core/src/main/scala/org/apache/spark/metrics/sink/GraphiteSink.scala @@ -42,11 +42,11 @@ private[spark] class GraphiteSink( def propertyToOption(prop: String): Option[String] = Option(property.getProperty(prop)) - if (!propertyToOption(GRAPHITE_KEY_HOST).isDefined) { + if (propertyToOption(GRAPHITE_KEY_HOST).isEmpty) { throw SparkCoreErrors.graphiteSinkPropertyMissingError("host") } - if (!propertyToOption(GRAPHITE_KEY_PORT).isDefined) { + if (propertyToOption(GRAPHITE_KEY_PORT).isEmpty) { throw SparkCoreErrors.graphiteSinkPropertyMissingError("port") } diff --git a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala index 976516dba40f..d303510a7e0b 100644 --- a/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala +++ b/core/src/main/scala/org/apache/spark/ui/jobs/StagePage.scala @@ -357,7 +357,7 @@ private[ui] class StagePage(parent: StagesTab, store: AppStatusStore) extends We |Status: ${taskInfo.status}<br> |Launch Time: ${UIUtils.formatDate(new Date(launchTime))} |${ - if (!taskInfo.duration.isDefined) { + if (taskInfo.duration.isEmpty) { s"""<br>Finish Time: ${UIUtils.formatDate(new Date(finishTime))}""" } else { "" diff --git a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala index 553d001285b2..e235b8aeb778 100644 --- a/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala @@ -697,7 +697,7 @@ class SparkSubmitSuite var sc2: SparkContext = null try { sc2 = new SparkContext(conf2) - assert(!sc2.progressBar.isDefined) + assert(sc2.progressBar.isEmpty) } finally { if (sc2 != null) { sc2.stop() diff --git a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala index 591b8b4c0df7..9768da09a9f8 100644 --- a/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/LocalCheckpointSuite.scala @@ -70,7 +70,7 @@ class LocalCheckpointSuite extends SparkFunSuite with LocalSparkContext { filteredRdd.localCheckpoint() assert(filteredRdd.checkpointData.isDefined) assert(!filteredRdd.checkpointData.get.isCheckpointed) - assert(!filteredRdd.checkpointData.get.checkpointRDD.isDefined) + assert(filteredRdd.checkpointData.get.checkpointRDD.isEmpty) assert(filteredRdd.getStorageLevel === LocalRDDCheckpointData.DEFAULT_STORAGE_LEVEL) // After an action, the lineage is truncated diff --git a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala index 948bc8889bcd..3eae1b3278e7 100644 --- a/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala @@ -1292,7 +1292,7 @@ private[spark] object JsonProtocolSuite extends Assertions { assert(opt2.isDefined) assertEquals(opt1.get, opt2.get) } else { - assert(!opt2.isDefined) + assert(opt2.isEmpty) } } diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala index c791edb9de15..bd2d08c0d79e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Interaction.scala @@ -162,7 +162,7 @@ class Interaction @Since("1.6.0") (@Since("1.6.0") override val uid: String) ext val attr = Attribute.decodeStructField(f, preserveName = true) if (attr == UnresolvedAttribute) { encodedFeatureAttrs(Seq(NumericAttribute.defaultAttr.withName(f.name)), None) - } else if (!attr.name.isDefined) { + } else if (attr.name.isEmpty) { encodedFeatureAttrs(Seq(attr.withName(f.name)), None) } else { encodedFeatureAttrs(Seq(attr), None) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala index d5226fef3c73..bfa828ae7a6c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/javaCode.scala @@ -176,7 +176,7 @@ trait Block extends TreeNode[Block] with JavaCode { @inline def transform(e: ExprValue): ExprValue = { val newE = f lift e - if (!newE.isDefined || newE.get.equals(e)) { + if (newE.isEmpty || newE.get.equals(e)) { e } else { changed = true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala index c170d34e1700..d71fab817df1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/rules/RuleIdCollection.scala @@ -214,7 +214,7 @@ object RuleIdCollection { def getRuleId(ruleName: String): RuleId = { val ruleIdOpt = ruleToId.get(ruleName) // Please add the rule name to `rulesWithIds` if rule id is not found. - if (!ruleIdOpt.isDefined) { + if (ruleIdOpt.isEmpty) { throw QueryExecutionErrors.ruleIdNotFoundForRuleError(ruleName) } ruleIdOpt.get diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala index d61ede780a74..5c4de5d4d963 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDTRegistrationSuite.scala @@ -84,6 +84,6 @@ class UDTRegistrationSuite extends SparkFunSuite { test("query unregistered user class") { assert(!UDTRegistration.exists(classOf[TestUserClass3].getName)) - assert(!UDTRegistration.getUDTFor(classOf[TestUserClass3].getName).isDefined) + assert(UDTRegistration.getUDTFor(classOf[TestUserClass3].getName).isEmpty) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala index 263006100bea..5f68f235485e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/artifact/ArtifactManagerSuite.scala @@ -206,7 +206,7 @@ class ArtifactManagerSuite extends SharedSparkSession { // Remove resources artifactManager.cleanUpResources() - assert(!blockManager.getLocalBytes(blockId).isDefined) + assert(blockManager.getLocalBytes(blockId).isEmpty) assert(!expectedPath.toFile.exists()) } finally { try { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala index 01599bb92869..31fbf9323140 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/sources/RatePerMicroBatchProviderSuite.scala @@ -130,7 +130,7 @@ class RatePerMicroBatchProviderSuite extends StreamTest { private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala index 116f5cdec817..a265bd9339b8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQuerySuite.scala @@ -756,7 +756,7 @@ class StreamingQuerySuite extends StreamTest with BeforeAndAfter with Logging wi private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala index 8f099c31e6b4..44c909d8ae2d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingSelfUnionSuite.scala @@ -144,7 +144,7 @@ class StreamingSelfUnionSuite extends StreamTest with BeforeAndAfter { private def waitUntilBatchProcessed(clock: StreamManualClock) = AssertOnQuery { q => eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { + if (q.exception.isEmpty) { assert(clock.isStreamWaitingAt(clock.getTimeMillis())) } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org