This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new b4c019627b6 [SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` with `orNull` b4c019627b6 is described below commit b4c019627b676edf850c00bb070377896b66fad2 Author: Qian.Sun <qian.sun2...@gmail.com> AuthorDate: Tue May 17 08:43:02 2022 -0500 [SPARK-39196][CORE][SQL][K8S] replace `getOrElse(null)` with `orNull` ### What changes were proposed in this pull request? This PR aims to replace `getOrElse(null)` with `orNull`. ### Why are the changes needed? Code simplification. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the GA. Closes #36567 from dcoliversun/SPARK-39196. Authored-by: Qian.Sun <qian.sun2...@gmail.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala | 2 +- core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala | 2 +- core/src/main/scala/org/apache/spark/rdd/RDD.scala | 2 +- core/src/main/scala/org/apache/spark/util/JsonProtocol.scala | 2 +- graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala | 2 +- .../org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala | 2 +- .../org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala | 4 ++-- .../deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala | 3 +-- .../apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala | 4 ++-- .../main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala | 2 +- .../apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala | 2 +- .../org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala | 4 ++-- .../spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala | 2 +- .../org/apache/spark/streaming/receiver/ReceiverSupervisor.scala | 2 +- 14 files changed, 17 insertions(+), 18 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index c82fda85eb4..a7840ef1055 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -162,7 +162,7 @@ private[kafka010] class KafkaSource( } override def reportLatestOffset(): streaming.Offset = { - latestPartitionOffsets.map(KafkaSourceOffset(_)).getOrElse(null) + latestPartitionOffsets.map(KafkaSourceOffset(_)).orNull } override def latestOffset(startOffset: streaming.Offset, limit: ReadLimit): streaming.Offset = { diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala index 15707ab9157..efe31be897b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala @@ -556,7 +556,7 @@ private[spark] abstract class BasePythonRunner[IN, OUT]( val obj = new Array[Byte](exLength) stream.readFully(obj) new PythonException(new String(obj, StandardCharsets.UTF_8), - writerThread.exception.getOrElse(null)) + writerThread.exception.orNull) } protected def handleEndOfDataSection(): Unit = { diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index c76b0d95d10..89397b8aa69 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -1816,7 +1816,7 @@ abstract class RDD[T: ClassTag]( */ @Experimental @Since("3.1.0") - def getResourceProfile(): ResourceProfile = resourceProfile.getOrElse(null) + def getResourceProfile(): ResourceProfile = resourceProfile.orNull // ======================================================================= // Other internal methods and fields diff --git a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala index ef5a812e4b6..0c15b13d5a1 100644 --- a/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala +++ b/core/src/main/scala/org/apache/spark/util/JsonProtocol.scala @@ -1272,7 +1272,7 @@ private[spark] object JsonProtocol { val properties = new Properties mapFromJson(json).foreach { case (k, v) => properties.setProperty(k, v) } properties - }.getOrElse(null) + }.orNull } def UUIDFromJson(json: JValue): UUID = { diff --git a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala index 211b4d6e4c5..b07b930eeef 100644 --- a/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala +++ b/graphx/src/main/scala/org/apache/spark/graphx/lib/TriangleCount.scala @@ -81,7 +81,7 @@ object TriangleCount { // join the sets with the graph val setGraph: Graph[VertexSet, ED] = graph.outerJoinVertices(nbrSets) { - (vid, _, optSet) => optSet.getOrElse(null) + (vid, _, optSet) => optSet.orNull } // Edge function computes intersection of smaller vertex with larger vertex diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala index 54f557c750a..51040857c64 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/SparkKubernetesClientFactory.scala @@ -88,7 +88,7 @@ private[spark] object SparkKubernetesClientFactory extends Logging { // Start from an auto-configured config with the desired context // Fabric 8 uses null to indicate that the users current context should be used so if no // explicit setting pass null - val config = new ConfigBuilder(autoConfigure(kubeContext.getOrElse(null))) + val config = new ConfigBuilder(autoConfigure(kubeContext.orNull)) .withApiVersion("v1") .withMasterUrl(master) .withRequestTimeout(clientType.requestTimeout(sparkConf)) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala index 5c572b62c00..bb86e678235 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesSuite.scala @@ -156,8 +156,8 @@ class KubernetesSuite extends SparkFunSuite // Try the spark test home sys.props("spark.test.home") ) - val sparkDirProp = possible_spark_dirs.filter(x => - new File(Paths.get(x).toFile, "bin/spark-submit").exists).headOption.getOrElse(null) + val sparkDirProp = possible_spark_dirs.find(x => + new File(Paths.get(x).toFile, "bin/spark-submit").exists).orNull require(sparkDirProp != null, s"Spark home directory must be provided in system properties tested $possible_spark_dirs") sparkHomeDir = Paths.get(sparkDirProp) diff --git a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala index 83535488cc0..b2d11828232 100644 --- a/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala +++ b/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/backend/cloud/KubeConfigBackend.scala @@ -43,8 +43,7 @@ private[spark] class KubeConfigBackend(var context: String) // If an explicit master URL was specified then override that detected from the // K8S config if it is different - var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL)) - .getOrElse(null) + var masterUrl = Option(System.getProperty(TestConstants.CONFIG_KEY_KUBE_MASTER_URL)).orNull if (StringUtils.isNotBlank(masterUrl)) { // Clean up master URL which would have been specified in Spark format into a normal // K8S master URL diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala index f9b2ade9a60..0fa2c3debe0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/higherOrderFunctions.scala @@ -1123,8 +1123,8 @@ case class MapZipWith(left: Expression, right: Expression, function: Expression) val valueData2 = mapData2.valueArray() var i = 0 for ((key, Array(index1, index2)) <- keysWithIndexes) { - val v1 = index1.map(valueData1.get(_, leftValueType)).getOrElse(null) - val v2 = index2.map(valueData2.get(_, rightValueType)).getOrElse(null) + val v1 = index1.map(valueData1.get(_, leftValueType)).orNull + val v2 = index2.map(valueData2.get(_, rightValueType)).orNull keyVar.value.set(key) value1Var.value.set(v1) value2Var.value.set(v2) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index b7239d3ff60..8dcbc99ab5d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -806,7 +806,7 @@ object QueryExecutionErrors extends QueryErrorsBase { |Could not execute broadcast in $timeout secs. You can increase the timeout |for broadcasts via ${SQLConf.BROADCAST_TIMEOUT.key} or disable broadcast join |by setting ${SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key} to -1 - """.stripMargin.replaceAll("\n", " "), ex.getOrElse(null)) + """.stripMargin.replaceAll("\n", " "), ex.orNull) } def cannotCompareCostWithTargetCostError(cost: String): Throwable = { diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala index 1f23aeb61e1..afbc2fdb5a0 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/AnsiTypeCoercionSuite.scala @@ -66,7 +66,7 @@ class AnsiTypeCoercionSuite extends TypeCoercionSuiteBase { val input = Literal("123") val castResult = AnsiTypeCoercion.implicitCast(input, to) assert(DataType.equalsIgnoreCaseAndNullability( - castResult.map(_.dataType).getOrElse(null), expected), + castResult.map(_.dataType).orNull, expected), s"Failed to cast String literal to $to") } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala index d7489aad7fc..adce553d194 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/analysis/TypeCoercionSuite.scala @@ -47,13 +47,13 @@ abstract class TypeCoercionSuiteBase extends AnalysisTest { // Check default value val castDefault = implicitCast(default(from), to) assert(DataType.equalsIgnoreCompatibleNullability( - castDefault.map(_.dataType).getOrElse(null), expected), + castDefault.map(_.dataType).orNull, expected), s"Failed to cast $from to $to") // Check null value val castNull = implicitCast(createNull(from), to) assert(DataType.equalsIgnoreCaseAndNullability( - castNull.map(_.dataType).getOrElse(null), expected), + castNull.map(_.dataType).orNull, expected), s"Failed to cast $from to $to") } diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala index 87165cc8cac..1ccb297b75c 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/ui/ThriftServerSessionPage.scala @@ -38,7 +38,7 @@ private[ui] class ThriftServerSessionPage(parent: ThriftServerTab) require(parameterId != null && parameterId.nonEmpty, "Missing id parameter") val content = store.synchronized { // make sure all parts in this page are consistent - val sessionStat = store.getSession(parameterId).getOrElse(null) + val sessionStat = store.getSession(parameterId).orNull require(sessionStat != null, "Invalid sessionID[" + parameterId + "]") generateBasicStats() ++ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala index b464dccb760..672452a4af4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/receiver/ReceiverSupervisor.scala @@ -190,7 +190,7 @@ private[streaming] abstract class ReceiverSupervisor( // This is a blocking action so we should use "futureExecutionContext" which is a cached // thread pool. logWarning("Restarting receiver with delay " + delay + " ms: " + message, - error.getOrElse(null)) + error.orNull) stopReceiver("Restarting receiver with delay " + delay + "ms: " + message, error) logDebug("Sleeping for " + delay) Thread.sleep(delay) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org