This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5db7beb59a67 [SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove 
redundant type conversion
5db7beb59a67 is described below

commit 5db7beb59a673f05e8f39aa9653cb0497a6c97cf
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sun Dec 24 14:44:14 2023 -0800

    [SPARK-46455][CORE][SQL][SS][CONNECT][PYTHON] Remove redundant type 
conversion
    
    ### What changes were proposed in this pull request?
    This pr aims to clean up redundant type conversion in Spark production code.
    
    ### Why are the changes needed?
    Code clean up.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44412 from LuciferYang/cleanup-redundant-conversion.
    
    Lead-authored-by: yangjie01 <yangji...@baidu.com>
    Co-authored-by: YangJie <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala | 2 +-
 .../spark/sql/connect/execution/ExecuteGrpcResponseSender.scala     | 4 ++--
 .../spark/sql/connect/execution/ExecuteResponseObserver.scala       | 2 +-
 .../spark/sql/connect/service/SparkConnectExecutionManager.scala    | 4 ++--
 .../org/apache/spark/examples/streaming/KinesisWordCountASL.scala   | 4 ++--
 .../scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala | 2 +-
 core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala      | 2 +-
 core/src/main/scala/org/apache/spark/rdd/RDD.scala                  | 2 +-
 core/src/main/scala/org/apache/spark/status/AppStatusStore.scala    | 2 +-
 mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala        | 2 +-
 sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala    | 4 ++--
 .../scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala  | 2 +-
 sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala          | 2 +-
 .../src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala   | 2 +-
 .../org/apache/spark/sql/execution/adaptive/QueryStageExec.scala    | 2 +-
 .../spark/sql/execution/command/AnalyzePartitionCommand.scala       | 2 +-
 .../apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala | 2 +-
 .../org/apache/spark/sql/execution/streaming/state/RocksDB.scala    | 6 +++---
 18 files changed, 24 insertions(+), 24 deletions(-)

diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
index 29b9fdf9dfb9..9e10fac8bb55 100644
--- 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
+++ 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroDeserializer.scala
@@ -327,7 +327,7 @@ private[sql] class AvroDeserializer(
           if (nonNullTypes.length == 1) {
             newWriter(nonNullTypes.head, catalystType, avroPath, catalystPath)
           } else {
-            nonNullTypes.map(_.getType).toSeq match {
+            nonNullTypes.map(_.getType) match {
               case Seq(a, b) if Set(a, b) == Set(INT, LONG) && catalystType == 
LongType =>
                 (updater, ordinal, value) =>
                   value match {
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
index 115cedfe1128..c9ceef969e29 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala
@@ -158,7 +158,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       Long.MaxValue
     } else {
       val confSize =
-        
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION).toLong
+        
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION)
       if (confSize > 0) System.currentTimeMillis() + confSize else 
Long.MaxValue
     }
 
@@ -167,7 +167,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: 
Message](
       Long.MaxValue
     } else {
       val confSize =
-        
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE).toLong
+        
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE)
       if (confSize > 0) confSize else Long.MaxValue
     }
     var sentResponsesSize: Long = 0
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
index b5844486b73a..a7877503f461 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
@@ -102,7 +102,7 @@ private[connect] class ExecuteResponseObserver[T <: 
Message](val executeHolder:
    * value greater than 0 will buffer the response from getResponse.
    */
   private val retryBufferSize = if (executeHolder.reattachable) {
-    
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE).toLong
+    
SparkEnv.get.conf.get(CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE)
   } else {
     0
   }
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
index d8d9cee3dad4..c90f53ac07df 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala
@@ -188,13 +188,13 @@ private[connect] class SparkConnectExecutionManager() 
extends Logging {
     scheduledExecutor match {
       case Some(_) => // Already running.
       case None =>
-        val interval = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL).toLong
+        val interval = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL)
         logInfo(s"Starting thread for cleanup of abandoned executions every 
$interval ms")
         scheduledExecutor = Some(Executors.newSingleThreadScheduledExecutor())
         scheduledExecutor.get.scheduleAtFixedRate(
           () => {
             try {
-              val timeout = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT).toLong
+              val timeout = 
SparkEnv.get.conf.get(CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT)
               periodicMaintenance(timeout)
             } catch {
               case NonFatal(ex) => logWarning("Unexpected exception in 
periodic task", ex)
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
index 7d12af3256f1..4835e9de086c 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/examples/streaming/KinesisWordCountASL.scala
@@ -230,9 +230,9 @@ object KinesisWordProducerASL {
     // Iterate and put records onto the stream per the given recordPerSec and 
wordsPerRecord
     for (i <- 1 to 10) {
       // Generate recordsPerSec records to put onto the stream
-      val records = (1 to recordsPerSecond.toInt).foreach { recordNum =>
+      val records = (1 to recordsPerSecond).foreach { recordNum =>
         // Randomly generate wordsPerRecord number of words
-        val data = (1 to wordsPerRecord.toInt).map(x => {
+        val data = (1 to wordsPerRecord).map(x => {
           // Get a random index to a word
           val randomWordIdx = Random.nextInt(randomWords.size)
           val randomWord = randomWords(randomWordIdx)
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index d0eee9c83c20..406c19be9bff 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -130,7 +130,7 @@ private[kinesis] class KinesisTestUtils(streamShardCount: 
Int = 2) extends Loggi
     val producer = getProducer(aggregate)
     val shardIdToSeqNumbers = producer.sendData(streamName, testData)
     logInfo(s"Pushed $testData:\n\t ${shardIdToSeqNumbers.mkString("\n\t")}")
-    shardIdToSeqNumbers.toMap
+    shardIdToSeqNumbers
   }
 
   /**
diff --git a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala 
b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
index 7a9c0263631f..2d72b4dd6bf2 100644
--- a/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/AsyncRDDActions.scala
@@ -104,7 +104,7 @@ class AsyncRDDActions[T: ClassTag](self: RDD[T]) extends 
Serializable with Loggi
         }
 
         val left = num - results.size
-        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts))
 
         val buf = new Array[Array[T]](p.size)
         self.context.setCallSite(callSite)
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index fe10e140f82d..9518433a7f69 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -1486,7 +1486,7 @@ abstract class RDD[T: ClassTag](
           }
         }
 
-        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+        val p = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts))
         val res = sc.runJob(this, (it: Iterator[T]) => it.take(left).toArray, 
p)
 
         res.foreach(buf ++= _.take(num - buf.size))
diff --git a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala 
b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
index c1c36d7a9f04..d50b8f935d56 100644
--- a/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
+++ b/core/src/main/scala/org/apache/spark/status/AppStatusStore.scala
@@ -371,7 +371,7 @@ private[spark] class AppStatusStore(
               Double.NaN
             }
           }
-        }.toIndexedSeq
+        }
       }
     }
 
diff --git a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala 
b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
index 5e2b8943ed84..9e085c7078e6 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/python/MLUtil.scala
@@ -37,7 +37,7 @@ object MLUtil {
     val destFSPath = new FSPath(destPath)
     val fs = destFSPath.getFileSystem(hadoopConf)
 
-    fs.copyFromLocalFile(false, true, new FSPath(localPath.toString), 
destFSPath)
+    fs.copyFromLocalFile(false, true, new FSPath(localPath), destFSPath)
   }
 
 }
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index dcb80221b0e0..17be8cfa12b5 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -139,7 +139,7 @@ object Metadata {
       case (key, JInt(value)) =>
         builder.putLong(key, value.toLong)
       case (key, JLong(value)) =>
-        builder.putLong(key, value.toLong)
+        builder.putLong(key, value)
       case (key, JDouble(value)) =>
         builder.putDouble(key, value)
       case (key, JBool(value)) =>
@@ -157,7 +157,7 @@ object Metadata {
             case _: JInt =>
               builder.putLongArray(key, 
value.asInstanceOf[List[JInt]].map(_.num.toLong).toArray)
             case _: JLong =>
-              builder.putLongArray(key, 
value.asInstanceOf[List[JLong]].map(_.num.toLong).toArray)
+              builder.putLongArray(key, 
value.asInstanceOf[List[JLong]].map(_.num).toArray)
             case _: JDouble =>
               builder.putDoubleArray(key, 
value.asInstanceOf[List[JDouble]].map(_.num).toArray)
             case _: JBool =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 3fd1fe04aed6..bb2b7e7ae066 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -105,7 +105,7 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       functionName: String, argumentName: String, candidates: Seq[String]): 
Throwable = {
     import 
org.apache.spark.sql.catalyst.util.StringUtils.orderSuggestedIdentifiersBySimilarity
 
-    val inputs = candidates.map(candidate => Seq(candidate)).toSeq
+    val inputs = candidates.map(candidate => Seq(candidate))
     val recommendations = orderSuggestedIdentifiersBySimilarity(argumentName, 
inputs)
       .take(3)
     new AnalysisException(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
index a3dc976647be..31e1495db7e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala
@@ -1435,7 +1435,7 @@ class Dataset[T] private[sql](
         case s: Symbol => Column(s.name).expr
         case e: Expression => e
         case literal => Literal(literal)
-      }.toSeq
+      }
       UnresolvedHint(name, exprs, logicalPlan)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
index c65d1931dd1b..7bc770a0c9e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala
@@ -518,7 +518,7 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with 
Logging with Serializ
         }
       }
 
-      val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts).toInt)
+      val parts = partsScanned.until(math.min(partsScanned + numPartsToTry, 
totalParts))
       val partsToScan = if (takeFromEnd) {
         // Reverse partitions to scan. So, if parts was [1, 2, 3] in 200 
partitions (0 to 199),
         // it becomes [198, 197, 196].
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
index 89e9de8b0843..88954d6f822d 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala
@@ -285,7 +285,7 @@ case class TableCacheQueryStageExec(
       sparkContext.submitJob(
         rdd,
         (_: Iterator[CachedBatch]) => (),
-        (0 until rdd.getNumPartitions).toSeq,
+        (0 until rdd.getNumPartitions),
         (_: Int, _: Unit) => (),
         ()
       )
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 98a851f19f05..4efd94e442e4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -65,7 +65,7 @@ case class AnalyzePartitionCommand(
     if (filteredSpec.isEmpty) {
       None
     } else {
-      Some(filteredSpec.toMap)
+      Some(filteredSpec)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index c4e12d5c4ae0..2cf299f87c89 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -83,7 +83,7 @@ trait OrcFiltersBase {
         .groupBy(_._1.toLowerCase(Locale.ROOT))
         .filter(_._2.size == 1)
         .transform((_, v) => v.head._2)
-      CaseInsensitiveMap(dedupPrimitiveFields.toMap)
+      CaseInsensitiveMap(dedupPrimitiveFields)
     }
   }
 }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index c33a7c472842..101a9e6b9199 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -547,13 +547,13 @@ class RocksDB(
       readerMemUsage + memTableMemUsage + blockCacheUsage,
       pinnedBlocksMemUsage,
       totalSSTFilesBytes,
-      nativeOpsLatencyMicros.toMap,
+      nativeOpsLatencyMicros,
       commitLatencyMs,
       bytesCopied = fileManagerMetrics.bytesCopied,
       filesCopied = fileManagerMetrics.filesCopied,
       filesReused = fileManagerMetrics.filesReused,
       zipFileBytesUncompressed = fileManagerMetrics.zipFileBytesUncompressed,
-      nativeOpsMetrics = nativeOpsMetrics.toMap)
+      nativeOpsMetrics = nativeOpsMetrics)
   }
 
   /**
@@ -861,7 +861,7 @@ object RocksDBConf {
     }
 
     def getStringConf(conf: ConfEntry): String = {
-      Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default).toString 
} getOrElse {
+      Try { getConfigMap(conf).getOrElse(conf.fullName, conf.default) } 
getOrElse {
         throw new IllegalArgumentException(
           s"Invalid value for '${conf.fullName}', must be a string"
         )


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to