This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new ac0bd2eb7b40 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace 
`s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`
ac0bd2eb7b40 is described below

commit ac0bd2eb7b4089096f9fb288482b2f1b5049b7e2
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Mon Dec 4 12:49:52 2023 -0800

    [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace 
`s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter`
    
    ### What changes were proposed in this pull request?
    This pr uses `s.c.MapOps.filter` to simplify code pattern 
`s.c.MapOps.view.filterKeys`.
    
    ### Why are the changes needed?
    The coding pattern of `s.c.MapOps.view.filterKeys` seems verbose, it can be 
simplified using `s.c.MapOps.filter`.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44160 from LuciferYang/SPARK-46245.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/sql/kafka010/KafkaContinuousStream.scala     |  2 +-
 .../scala/org/apache/spark/deploy/master/Master.scala  |  3 +--
 .../spark/deploy/rest/RestSubmissionClient.scala       |  4 ++--
 .../spark/executor/CoarseGrainedExecutorBackend.scala  |  8 ++++----
 .../org/apache/spark/resource/ResourceProfile.scala    |  4 ++--
 .../org/apache/spark/scheduler/DAGScheduler.scala      |  4 +++-
 .../cluster/CoarseGrainedSchedulerBackend.scala        |  2 +-
 .../scheduler/cluster/StandaloneSchedulerBackend.scala |  4 ++--
 .../spark/storage/ShuffleBlockFetcherIterator.scala    |  2 +-
 .../main/scala/org/apache/spark/ui/PagedTable.scala    |  7 +++----
 .../org/apache/spark/HeartbeatReceiverSuite.scala      |  2 +-
 .../scala/org/apache/spark/SparkThrowableSuite.scala   |  5 ++---
 .../spark/internal/plugin/PluginContainerSuite.scala   |  2 +-
 .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala  |  6 ++++--
 .../apache/spark/deploy/yarn/ExecutorRunnable.scala    |  2 +-
 .../org/apache/spark/deploy/yarn/YarnAllocator.scala   |  2 +-
 .../apache/spark/sql/catalyst/catalog/interface.scala  |  8 +++-----
 .../catalyst/expressions/codegen/CodeGenerator.scala   |  2 +-
 .../catalyst/plans/logical/basicLogicalOperators.scala |  5 +++--
 .../scala/org/apache/spark/sql/DataFrameWriter.scala   |  2 +-
 .../apache/spark/sql/execution/command/tables.scala    |  7 ++++---
 .../spark/sql/execution/datasources/DataSource.scala   |  4 ++--
 .../spark/sql/execution/datasources/FileFormat.scala   |  2 +-
 .../sql/execution/datasources/jdbc/JDBCOptions.scala   |  3 ++-
 .../sql/execution/datasources/v2/CacheTableExec.scala  |  3 ++-
 .../execution/datasources/v2/DataSourceV2Utils.scala   |  2 +-
 .../execution/datasources/v2/FileDataSourceV2.scala    |  2 +-
 .../execution/datasources/v2/ShowCreateTableExec.scala | 18 ++++++++++--------
 .../execution/datasources/v2/V2SessionCatalog.scala    |  4 ++--
 .../execution/streaming/state/RocksDBFileManager.scala |  6 +++---
 .../apache/spark/sql/execution/ui/ExecutionPage.scala  |  4 ++--
 .../apache/spark/sql/streaming/DataStreamReader.scala  |  2 +-
 .../apache/spark/sql/streaming/DataStreamWriter.scala  |  2 +-
 .../apache/spark/sql/hive/HiveExternalCatalog.scala    | 11 ++++++-----
 .../apache/spark/sql/hive/HiveMetastoreCatalog.scala   |  7 +++----
 .../apache/spark/sql/hive/execution/HiveOptions.scala  |  6 +++---
 .../spark/sql/hive/HiveSchemaInferenceSuite.scala      |  2 +-
 .../org/apache/spark/sql/hive/StatisticsSuite.scala    | 10 +++++-----
 .../apache/spark/sql/hive/execution/HiveDDLSuite.scala |  8 ++++----
 .../hive/execution/command/ShowCreateTableSuite.scala  |  2 +-
 40 files changed, 93 insertions(+), 88 deletions(-)

diff --git 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
index 026c4d560722..a86acd971a1c 100644
--- 
a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
+++ 
b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala
@@ -102,7 +102,7 @@ class KafkaContinuousStream(
     }
 
     val startOffsets = newPartitionOffsets ++
-      oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_))
+      oldStartPartitionOffsets.filter { case (k, _) => 
!deletedPartitions.contains(k) }
     knownPartitions = startOffsets.keySet
 
     startOffsets.toSeq.map {
diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala 
b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
index 0fe72e28ea5b..2e1d7b9bce33 100644
--- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
+++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala
@@ -958,8 +958,7 @@ private[deploy] class Master(
   private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = {
     val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet
     val workersToRemove = addressToWorker
-      .view
-      .filterKeys(addr => 
hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)))
+      .filter { case (addr, _) => 
hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)) }
       .values
 
     val workersToRemoveHostPorts = workersToRemove.map(_.hostPort)
diff --git 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
index 286305bb76b8..f1400a0c6a74 100644
--- 
a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
+++ 
b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala
@@ -492,9 +492,9 @@ private[spark] object RestSubmissionClient {
    * Filter non-spark environment variables from any environment.
    */
   private[rest] def filterSystemEnvironment(env: Map[String, String]): 
Map[String, String] = {
-    env.view.filterKeys { k =>
+    env.filter { case (k, _) =>
       k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k)
-    }.toMap
+    }
   }
 
   private[spark] def supportsRestClient(master: String): Boolean = {
diff --git 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
index 4bf4929c1339..366b481bf6a4 100644
--- 
a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala
@@ -157,14 +157,14 @@ private[spark] class CoarseGrainedExecutorBackend(
 
   def extractLogUrls: Map[String, String] = {
     val prefix = "SPARK_LOG_URL_"
-    sys.env.view.filterKeys(_.startsWith(prefix))
-      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), 
e._2)).toMap
+    sys.env.filter { case (k, _) => k.startsWith(prefix) }
+      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
   }
 
   def extractAttributes: Map[String, String] = {
     val prefix = "SPARK_EXECUTOR_ATTRIBUTE_"
-    sys.env.view.filterKeys(_.startsWith(prefix))
-      .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), 
e._2)).toMap
+    sys.env.filter { case (k, _) => k.startsWith(prefix) }
+      .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2))
   }
 
   def notifyDriverAboutPushCompletion(shuffleId: Int, shuffleMergeId: Int, 
mapIndex: Int): Unit = {
diff --git 
a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala 
b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
index fdaa68b6931e..69c0672562c2 100644
--- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
+++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala
@@ -95,12 +95,12 @@ class ResourceProfile(
   }
 
   private[spark] def getCustomTaskResources(): Map[String, 
TaskResourceRequest] = {
-    taskResources.view.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap
+    taskResources.filter { case (k, _) => !k.equals(ResourceProfile.CPUS) }
   }
 
   protected[spark] def getCustomExecutorResources(): Map[String, 
ExecutorResourceRequest] = {
     executorResources.
-      view.filterKeys(k => 
!ResourceProfile.allSupportedExecutorResources.contains(k)).toMap
+      filter { case (k, _) => 
!ResourceProfile.allSupportedExecutorResources.contains(k) }
   }
 
   /*
diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala 
b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
index 241ef35cad73..1a51220cdf74 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
@@ -845,7 +845,9 @@ private[spark] class DAGScheduler(
     if (registeredStages.isEmpty || registeredStages.get.isEmpty) {
       logError("No stages registered for job " + job.jobId)
     } else {
-      stageIdToStage.view.filterKeys(stageId => 
registeredStages.get.contains(stageId)).foreach {
+      stageIdToStage.filter {
+        case (stageId, _) => registeredStages.get.contains(stageId)
+      }.foreach {
         case (stageId, stage) =>
           val jobSet = stage.jobIds
           if (!jobSet.contains(job.jobId)) {
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index 4f1503f1d8c4..831fbd45edd7 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -373,7 +373,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
       // Make sure no executor is killed while some task is launching on it
       val taskDescs = withLock {
         // Filter out executors under killing
-        val activeExecutors = executorDataMap.view.filterKeys(isExecutorActive)
+        val activeExecutors = executorDataMap.filter { case (id, _) => 
isExecutorActive(id) }
         val workOffers = activeExecutors.map {
           case (id, executorData) => buildWorkerOffer(id, executorData)
         }.toIndexedSeq
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
index 8da1a568cb4c..bd1cb164b4be 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala
@@ -254,8 +254,8 @@ private[spark] class StandaloneSchedulerBackend(
 
   override def getDriverLogUrls: Option[Map[String, String]] = {
     val prefix = "SPARK_DRIVER_LOG_URL_"
-    val driverLogUrls = sys.env.view.filterKeys(_.startsWith(prefix))
-      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), 
e._2)).toMap
+    val driverLogUrls = sys.env.filter { case (k, _) => k.startsWith(prefix) }
+      .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2))
     if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
index 6144cc0e6a9a..4b59c1fdfa24 100644
--- 
a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
+++ 
b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala
@@ -653,7 +653,7 @@ final class ShuffleBlockFetcherIterator(
         hostLocalDirManager.getHostLocalDirs(host, port, 
bmIds.map(_.executorId)) {
           case Success(dirsByExecId) =>
             fetchMultipleHostLocalBlocks(
-              
hostLocalBlocksWithMissingDirs.view.filterKeys(bmIds.contains).toMap,
+              hostLocalBlocksWithMissingDirs.filter { case (k, _) => 
bmIds.contains(k) },
               dirsByExecId,
               cached = false)
 
diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala 
b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
index 208b8189b4da..4fa9cd2c1e1d 100644
--- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
+++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala
@@ -207,10 +207,9 @@ private[spark] trait PagedTable[T] {
           .withKeyValueSeparator("=")
           .split(search)
           .asScala
-          .view
-          .filterKeys(_ != pageSizeFormField)
-          .filterKeys(_ != pageNumberFormField)
-          .mapValues(URLDecoder.decode(_, UTF_8.name()))
+          .filter { case (k, _) => k != pageSizeFormField}
+          .filter { case (k, _) => k != pageNumberFormField}
+          .map { case (k, v) => (k, URLDecoder.decode(v, UTF_8.name())) }
           .map { case (k, v) =>
             <input type="hidden" name={k} value={v} />
           }
diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala 
b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
index e3463a9dce09..c0a8f2ecdd49 100644
--- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
+++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala
@@ -306,7 +306,7 @@ class HeartbeatReceiverSuite
     // We may receive undesired SparkListenerExecutorAdded from 
LocalSchedulerBackend,
     // so exclude it from the map. See SPARK-10800.
     heartbeatReceiver.invokePrivate(_executorLastSeen()).
-      view.filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap
+      filter { case (k, _) => k != SparkContext.DRIVER_IDENTIFIER }
   }
 }
 
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index c012613c2ee0..1ab50f867ff4 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -152,9 +152,8 @@ class SparkThrowableSuite extends SparkFunSuite {
 
   test("Message format invariants") {
     val messageFormats = errorReader.errorInfoMap
-      .view
-      .filterKeys(!_.startsWith("_LEGACY_ERROR_"))
-      .filterKeys(!_.startsWith("INTERNAL_ERROR"))
+      .filter { case (k, _) => !k.startsWith("_LEGACY_ERROR_") }
+      .filter { case (k, _) => !k.startsWith("INTERNAL_ERROR") }
       .values.toSeq.flatMap { i => Seq(i.messageTemplate) }
     checkCondition(messageFormats, s => s != null)
     checkIfUnique(messageFormats)
diff --git 
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
 
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
index 95b484d7176a..197c2f13d807 100644
--- 
a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala
@@ -264,7 +264,7 @@ object NonLocalModeSparkPlugin {
       resources: Map[String, ResourceInformation]): String = {
     // try to keep this simple and only write the gpus addresses, if we add 
more resources need to
     // make more complex
-    val resourcesString = resources.view.filterKeys(_.equals(GPU)).map {
+    val resourcesString = resources.filter { case (k, _) => k.equals(GPU) 
}.map {
       case (_, ri) =>
         s"${ri.addresses.mkString(",")}"
     }.mkString(",")
diff --git 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index a4403fb96b21..236dfff9ac11 100644
--- 
a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ 
b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -187,7 +187,8 @@ class ExecutorPodsAllocator(
     // to the schedulerKnownNewlyCreatedExecs
     val schedulerKnownExecs = 
schedulerBackend.getExecutorIds().map(_.toLong).toSet
     schedulerKnownNewlyCreatedExecs ++=
-      
newlyCreatedExecutors.view.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1)
+      newlyCreatedExecutors.filter { case (k, _) => 
schedulerKnownExecs.contains(k) }
+        .map { case (k, v) => (k, v._1) }
     newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet
 
     // For all executors we've created against the API but have not seen in a 
snapshot
@@ -239,7 +240,8 @@ class ExecutorPodsAllocator(
       _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs)
     }
 
-    val notDeletedPods = 
lastSnapshot.executorPods.view.filterKeys(!_deletedExecutorIds.contains(_))
+    val notDeletedPods = lastSnapshot.executorPods
+      .filter { case (k, _) => !_deletedExecutorIds.contains(k) }
     // Map the pods into per ResourceProfile id so we can check per 
ResourceProfile,
     // add a fast path if not using other ResourceProfiles.
     val rpIdToExecsAndPodState =
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
index 6a3fa50916b7..81b210a2297a 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala
@@ -205,7 +205,7 @@ private[yarn] class ExecutorRunnable(
     val env = new HashMap[String, String]()
     Client.populateClasspath(null, conf, sparkConf, env, 
sparkConf.get(EXECUTOR_CLASS_PATH))
 
-    System.getenv().asScala.view.filterKeys(_.startsWith("SPARK"))
+    System.getenv().asScala.filter { case (k, _) => k.startsWith("SPARK") }
       .foreach { case (k, v) => env(k) = v }
 
     sparkConf.getExecutorEnv.foreach { case (key, value) =>
diff --git 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 5d24870bbcda..736eaa52b81c 100644
--- 
a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ 
b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -344,7 +344,7 @@ private[yarn] class YarnAllocator(
         val gpuResource = sparkConf.get(YARN_GPU_DEVICE)
         val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE)
         getYarnResourcesAndAmounts(sparkConf, 
config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
-          customSparkResources.view.filterKeys { r =>
+          customSparkResources.filter { case (r, _) =>
             (r == gpuResource || r == fpgaResource)
           }
       } else {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 20352129a06c..f716c2a0ccb6 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -461,7 +461,7 @@ case class CatalogTable(
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
     val map = new mutable.LinkedHashMap[String, String]()
     val tableProperties =
-      
SQLConf.get.redactOptions(properties.view.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap)
+      SQLConf.get.redactOptions(properties.filter { case (k, _) => 
!k.startsWith(VIEW_PREFIX) })
         .toSeq.sortBy(_._1)
         .map(p => p._1 + "=" + p._2)
     val partitionColumns = 
partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]")
@@ -621,10 +621,8 @@ object CatalogTable {
       createTime = 0L,
       lastAccessTime = 0L,
       properties = table.properties
-        .view
-        .filterKeys(!nondeterministicProps.contains(_))
-        .map(identity)
-        .toMap,
+        .filter { case (k, _) => !nondeterministicProps.contains(k) }
+        .map(identity),
       stats = None,
       ignoredProperties = Map.empty
     )
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 0340ac376563..e73b00600764 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -544,7 +544,7 @@ class CodegenContext extends Logging {
         s"private $className $classInstance = new $className();"
     }
 
-    val declareNestedClasses = classFunctions.view.filterKeys(_ != 
outerClassName).map {
+    val declareNestedClasses = classFunctions.filter { case (k, _) => k != 
outerClassName }.map {
       case (className, functions) =>
         s"""
            |private class $className {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 9bd0f58e3df8..c66ead30ab3c 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -778,13 +778,14 @@ object View {
     // as optimization configs but they are still needed during the view 
resolution.
     // TODO: remove this `retainedConfigs` after the `RelationConversions` is 
moved to
     // optimization phase.
-    val retainedConfigs = activeConf.getAllConfs.view.filterKeys(key =>
+    val retainedConfigs = activeConf.getAllConfs.filter { case (key, _) =>
       Seq(
         "spark.sql.hive.convertMetastoreParquet",
         "spark.sql.hive.convertMetastoreOrc",
         "spark.sql.hive.convertInsertingPartitionedTable",
         "spark.sql.hive.convertMetastoreCtas"
-      ).contains(key) || key.startsWith("spark.sql.catalog."))
+      ).contains(key) || key.startsWith("spark.sql.catalog.")
+    }
     for ((k, v) <- configs ++ retainedConfigs) {
       sqlConf.settings.put(k, v)
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
index 5a7ac49bd4e7..c8727146160b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala
@@ -263,7 +263,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) 
{
 
       val optionsWithPath = getOptionsWithPath(path)
 
-      val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
+      val finalOptions = sessionOptions.filter { case (k, _) => 
!optionsWithPath.contains(k) } ++
         optionsWithPath.originalMap
       val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 7ed82b16cc5e..2f8fca7cfd73 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -675,7 +675,7 @@ case class DescribeTableCommand(
     )
     append(buffer, "", "", "")
     append(buffer, "# Detailed Table Information", "", "")
-    
table.toLinkedHashMap.view.filterKeys(!excludedTableInfo.contains(_)).foreach {
+    table.toLinkedHashMap.filter { case (k, _) => 
!excludedTableInfo.contains(k) }.foreach {
       s => append(buffer, s._1, s._2, "")
     }
   }
@@ -956,7 +956,7 @@ case class ShowTablePropertiesCommand(
             Seq(Row(p, propValue))
           }
         case None =>
-          properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+          properties.filter { case (k, _) => 
!k.startsWith(CatalogTable.VIEW_PREFIX) }
             .toSeq.sortBy(_._1).map(p => Row(p._1, p._2))
       }
     }
@@ -1104,7 +1104,8 @@ trait ShowCreateTableCommandBase extends SQLConfHelper {
   }
 
   private def showViewProperties(metadata: CatalogTable, builder: 
StringBuilder): Unit = {
-    val viewProps = 
metadata.properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX))
+    val viewProps = metadata.properties
+      .filter { case (k, _) => !k.startsWith(CatalogTable.VIEW_PREFIX) }
     if (viewProps.nonEmpty) {
       val props = viewProps.toSeq.sortBy(_._1).map { case (key, value) =>
         s"'${escapeSingleQuotedString(key)}' = 
'${escapeSingleQuotedString(value)}'"
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
index cebc74af724d..668d2538e03f 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala
@@ -806,9 +806,9 @@ object DataSource extends Logging {
    */
   def buildStorageFormatFromOptions(options: Map[String, String]): 
CatalogStorageFormat = {
     val path = CaseInsensitiveMap(options).get("path")
-    val optionsWithoutPath = 
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != "path")
+    val optionsWithoutPath = options.filter { case (k, _) => 
k.toLowerCase(Locale.ROOT) != "path" }
     CatalogStorageFormat.empty.copy(
-      locationUri = path.map(CatalogUtils.stringToURI), properties = 
optionsWithoutPath.toMap)
+      locationUri = path.map(CatalogUtils.stringToURI), properties = 
optionsWithoutPath)
   }
 
   /**
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
index babe3e88d585..36c59950fe20 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala
@@ -311,7 +311,7 @@ object FileFormat {
     // file split information yet, nor do we have a way to provide custom 
metadata column values.
     val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE, 
FILE_MODIFICATION_TIME)
     val extractors =
-      
FileFormat.BASE_METADATA_EXTRACTORS.view.filterKeys(validFieldNames.contains).toMap
+      FileFormat.BASE_METADATA_EXTRACTORS.filter { case (k, _) => 
validFieldNames.contains(k) }
     assert(fieldNames.forall(validFieldNames.contains))
     val pf = PartitionedFile(
       partitionValues = partitionValues,
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
index daa282fda6ab..28fa7b8bf561 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala
@@ -62,7 +62,8 @@ class JDBCOptions(
    */
   val asConnectionProperties: Properties = {
     val properties = new Properties()
-    parameters.originalMap.view.filterKeys(key => 
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)))
+    parameters.originalMap
+      .filter { case (key, _) => 
!jdbcOptionNames(key.toLowerCase(Locale.ROOT)) }
       .foreach { case (k, v) => properties.setProperty(k, v) }
     properties
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
index b540ab6a4c98..28241fb0a67a 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala
@@ -41,7 +41,8 @@ trait BaseCacheTableExec extends LeafV2CommandExec {
     val storageLevel = CaseInsensitiveMap(options).get(storageLevelKey)
       .map(s => StorageLevel.fromString(s.toUpperCase(Locale.ROOT)))
       .getOrElse(conf.defaultCacheStorageLevel)
-    val withoutStorageLevel = 
options.view.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey)
+    val withoutStorageLevel = options
+      .filter { case (k, _) => k.toLowerCase(Locale.ROOT) != storageLevelKey }
     if (withoutStorageLevel.nonEmpty) {
       logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}")
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
index 3dde20ac44e7..ef8e4605f472 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala
@@ -106,7 +106,7 @@ private[sql] object DataSourceV2Utils extends Logging {
 
     val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*)
 
-    val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
+    val finalOptions = sessionOptions.filter { case (k, _) => 
!optionsWithPath.contains(k) } ++
       optionsWithPath.originalMap
     val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
     val (table, catalog, ident) = provider match {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
index 03c6589f8dcc..fe75b17af025 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala
@@ -58,7 +58,7 @@ trait FileDataSourceV2 extends TableProvider with 
DataSourceRegister {
   }
 
   protected def getOptionsWithoutPaths(map: CaseInsensitiveStringMap): 
CaseInsensitiveStringMap = {
-    val withoutPath = map.asCaseSensitiveMap().asScala.view.filterKeys { k =>
+    val withoutPath = map.asCaseSensitiveMap().asScala.filter { case (k, _) =>
       !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths")
     }
     new CaseInsensitiveStringMap(withoutPath.toMap.asJava)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
index facfa5674726..102214e36c91 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala
@@ -50,10 +50,10 @@ case class ShowCreateTableExec(
     showTableDataColumns(table, builder)
     showTableUsing(table, builder)
 
-    val tableOptions = table.properties.asScala.view
-      .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
-      case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
-    }.toMap
+    val tableOptions = table.properties.asScala
+      .filter { case (k, _) => k.startsWith(TableCatalog.OPTION_PREFIX) }.map {
+        case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v
+      }.toMap
     showTableOptions(builder, tableOptions)
     showTablePartitioning(table, builder)
     showTableComment(table, builder)
@@ -132,10 +132,12 @@ case class ShowCreateTableExec(
       builder: StringBuilder,
       tableOptions: Map[String, String]): Unit = {
 
-    val showProps = table.properties.asScala.view
-      .filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key)
-        && !key.startsWith(TableCatalog.OPTION_PREFIX)
-        && !tableOptions.contains(key))
+    val showProps = table.properties.asScala
+      .filter { case (key, _) =>
+        !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key) &&
+        !key.startsWith(TableCatalog.OPTION_PREFIX) &&
+        !tableOptions.contains(key)
+      }
     if (showProps.nonEmpty) {
       val props = conf.redactOptions(showProps.toMap).toSeq.sortBy(_._1).map {
         case (key, value) =>
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index 6cd7ec403be3..5a8f2e2d8a40 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -154,9 +154,9 @@ class V2SessionCatalog(catalog: SessionCatalog)
   }
 
   private def toOptions(properties: Map[String, String]): Map[String, String] 
= {
-    properties.view.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
+    properties.filter { case (k, _) => 
k.startsWith(TableCatalog.OPTION_PREFIX) }.map {
       case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
-    }.toMap
+    }
   }
 
   override def alterTable(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
index e6d9b6f8d96a..5e2b6afee68e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala
@@ -459,10 +459,10 @@ class RocksDBFileManager(
     // Get the immutable files used in previous versions, as some of those 
uploaded files can be
     // reused for this version
     logInfo(s"Saving RocksDB files to DFS for $version")
-    val prevFilesToSizes = versionToRocksDBFiles.asScala.view.filterKeys(_ < 
version)
+    val prevFilesToSizes = versionToRocksDBFiles.asScala.filter { case (k, _) 
=> k < version }
       .values.flatten.map { f =>
-      f.localFileName -> f
-    }.toMap
+        f.localFileName -> f
+      }.toMap
 
     var bytesCopied = 0L
     var filesCopied = 0L
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
index 2a3028155a24..cbec837762b7 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala
@@ -85,9 +85,9 @@ class ExecutionPage(parent: SQLTab) extends 
WebUIPage("execution") with Logging
       summary ++
         planVisualization(request, metrics, graph) ++
         physicalPlanDescription(executionUIData.physicalPlanDescription) ++
-        
modifiedConfigs(configs.view.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap)
 ++
+        modifiedConfigs(configs.filter { case (k, _) => 
!k.startsWith(pandasOnSparkConfPrefix) }) ++
         modifiedPandasOnSparkConfigs(
-          configs.view.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap)
+          configs.filter { case (k, _) => 
k.startsWith(pandasOnSparkConfPrefix) })
     }.getOrElse {
       <div>No information to display for query {executionId}</div>
     }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
index 905c96ff4cbb..1a69678c2f54 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala
@@ -175,7 +175,7 @@ final class DataStreamReader private[sql](sparkSession: 
SparkSession) extends Lo
       case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] 
=>
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = sparkSession.sessionState.conf)
-        val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
+        val finalOptions = sessionOptions.filter { case (k, _) => 
!optionsWithPath.contains(k) } ++
             optionsWithPath.originalMap
         val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
         val table = DataSourceV2Utils.getTableFromProvider(provider, 
dsOptions, userSpecifiedSchema)
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
index 59bc3f5d08a4..95aa2f8c7a4e 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala
@@ -385,7 +385,7 @@ final class DataStreamWriter[T] private[sql](ds: 
Dataset[T]) {
         val provider = 
cls.getConstructor().newInstance().asInstanceOf[TableProvider]
         val sessionOptions = DataSourceV2Utils.extractSessionConfigs(
           source = provider, conf = df.sparkSession.sessionState.conf)
-        val finalOptions = 
sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++
+        val finalOptions = sessionOptions.filter { case (k, _) => 
!optionsWithPath.contains(k) } ++
           optionsWithPath.originalMap
         val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava)
         // If the source accepts external table metadata, here we pass the 
schema of input query
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 32526f1d18f5..c5adb8e27c4c 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -870,7 +870,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
         locationUri = tableLocation.map(CatalogUtils.stringToURI(_)))
     }
     val storageWithoutHiveGeneratedProperties = 
storageWithLocation.copy(properties =
-      
storageWithLocation.properties.view.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap)
+      storageWithLocation.properties.filter { case (k, _) => 
!HIVE_GENERATED_STORAGE_PROPERTIES(k) }
+    )
     val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER)
 
     val schemaFromTableProps =
@@ -885,7 +886,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       partitionColumnNames = partColumnNames,
       bucketSpec = getBucketSpecFromTableProperties(table),
       tracksPartitionsInCatalog = partitionProvider == 
Some(TABLE_PARTITION_PROVIDER_CATALOG),
-      properties = 
table.properties.view.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap)
+      properties = table.properties.filter { case (k, _) => 
!HIVE_GENERATED_TABLE_PROPERTIES(k) })
   }
 
   override def tableExists(db: String, table: String): Boolean = withClient {
@@ -1160,15 +1161,15 @@ private[spark] class HiveExternalCatalog(conf: 
SparkConf, hadoopConf: Configurat
       properties: Map[String, String],
       table: String): Option[CatalogStatistics] = {
 
-    val statsProps = 
properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    val statsProps = properties.filter { case (k, _) => 
k.startsWith(STATISTICS_PREFIX) }
     if (statsProps.isEmpty) {
       None
     } else {
       val colStats = new mutable.HashMap[String, CatalogColumnStat]
       val colStatsProps =
-        
properties.view.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+        properties.filter { case (k, _) => 
k.startsWith(STATISTICS_COL_STATS_PREFIX) }.map {
           case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
-        }.toMap
+        }
 
       // Find all the column names by matching the KEY_VERSION properties for 
them.
       colStatsProps.keys.filter {
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
index e7d03b82274c..c9446b378297 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala
@@ -133,12 +133,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: 
SparkSession) extends Log
     // Consider table and storage properties. For properties existing in both 
sides, storage
     // properties will supersede table properties.
     if (serde.contains("parquet")) {
-      val options = 
relation.tableMeta.properties.view.filterKeys(isParquetProperty).toMap ++
+      val options = relation.tableMeta.properties.filter { case (k, _) => 
isParquetProperty(k) } ++
         relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA ->
         
SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString)
         convertToLogicalRelation(relation, options, 
classOf[ParquetFileFormat], "parquet", isWrite)
     } else {
-      val options = 
relation.tableMeta.properties.view.filterKeys(isOrcProperty).toMap ++
+      val options = relation.tableMeta.properties.filter { case (k, _) => 
isOrcProperty(k) } ++
         relation.tableMeta.storage.properties
       if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") {
         convertToLogicalRelation(
@@ -377,8 +377,7 @@ private[hive] object HiveMetastoreCatalog {
     // Find any nullable fields in metastore schema that are missing from the 
inferred schema.
     val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> 
f).toMap
     val missingNullables = metastoreFields
-      .view
-      .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_))
+      .filter { case (k, _) => 
!inferredSchema.map(_.name.toLowerCase).contains(k) }
       .values
       .filter(_.nullable)
     // Merge missing nullable fields to inferred schema and build a 
case-insensitive field map.
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
index 44f0e43d11e5..0f2eee6798ea 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala
@@ -85,9 +85,9 @@ class HiveOptions(@transient private val parameters: 
CaseInsensitiveMap[String])
       s"line delimiter, but given: $lineDelim.")
   }
 
-  def serdeProperties: Map[String, String] = parameters.view.filterKeys {
-    k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT))
-  }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap
+  def serdeProperties: Map[String, String] = parameters.filter {
+    case (k, _) => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT))
+  }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }
 }
 
 object HiveOptions {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
index 8ff209d1580b..1eb3f6f3c9cb 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala
@@ -124,7 +124,7 @@ class HiveSchemaInferenceSuite
     // properties out).
     assert(!externalCatalog.getTable(DATABASE, 
TEST_TABLE_NAME).schemaPreservesCase)
     val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME)
-    
assert(rawTable.properties.view.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty)
+    assert(!rawTable.properties.exists { case (k, _) => 
k.startsWith(DATASOURCE_SCHEMA_PREFIX) })
 
     // Add partition records (if specified)
     if (!partitionCols.isEmpty) {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 21a115486298..5502414629c0 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -970,7 +970,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase 
with TestHiveSingleto
    */
   private def getStatsProperties(tableName: String): Map[String, String] = {
     val hTable = 
hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName)
-    hTable.properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap
+    hTable.properties.filter { case (k, _) => k.startsWith(STATISTICS_PREFIX) }
   }
 
   test("change stats after insert command for hive table") {
@@ -1209,7 +1209,7 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + 
stats.keys.mkString(", "))
       val table = hiveClient.getTable("default", tableName)
       val props =
-        
table.properties.view.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap
+        table.properties.filter { case (k, _) => 
k.startsWith("spark.sql.statistics.colStats") }
       assert(props == expected)
     }
 
@@ -1278,11 +1278,11 @@ class StatisticsSuite extends 
StatisticsCollectionTestBase with TestHiveSingleto
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS cint, 
ctimestamp")
         val table = hiveClient.getTable("default", tableName)
         val intHistogramProps = table.properties
-          
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram"))
+          .filter { case (k, _) => 
k.startsWith("spark.sql.statistics.colStats.cint.histogram") }
         assert(intHistogramProps.size == 1)
 
-        val tsHistogramProps = table.properties
-          
.view.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram"))
+        val tsHistogramProps = table.properties.filter {
+          case (k, _) => 
k.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram") }
         assert(tsHistogramProps.size == 1)
 
         // Validate histogram after deserialization.
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
index 2f5d1fcbb540..cf2098641ad1 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala
@@ -133,7 +133,7 @@ class HiveDDLSuite
       createTime = 0L,
       lastAccessTime = 0L,
       owner = "",
-      properties = 
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
+      properties = table.properties.filter { case (k, _) => 
!nondeterministicProps.contains(k) },
       // View texts are checked separately
       viewText = None
     )
@@ -1089,7 +1089,7 @@ class HiveDDLSuite
       expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ")
     val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" 
-> "4"))
     assert(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was 
already set")
-    
assert(oldPart.storage.properties.view.filterKeys(expectedSerdeProps.contains) 
!=
+    assert(oldPart.storage.properties.filter { case (k, _) => 
expectedSerdeProps.contains(k) } !=
       expectedSerdeProps, "bad test: serde properties were already set")
     sql(s"""ALTER TABLE boxes PARTITION (width=4)
       |    SET SERDE '$expectedSerde'
@@ -1097,7 +1097,7 @@ class HiveDDLSuite
       |""".stripMargin)
     val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" 
-> "4"))
     assert(newPart.storage.serde == Some(expectedSerde))
-    
assert(newPart.storage.properties.view.filterKeys(expectedSerdeProps.contains).toMap
 ==
+    assert(newPart.storage.properties.filter { case (k, _) => 
expectedSerdeProps.contains(k) } ==
       expectedSerdeProps)
   }
 
@@ -1699,7 +1699,7 @@ class HiveDDLSuite
       "minFileSize"
     )
     assert(
-      
targetTable.properties.view.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty,
+      targetTable.properties.forall { case (k, _) => 
metastoreGeneratedProperties.contains(k) },
       "the table properties of source tables should not be copied in the 
created table")
 
     provider match {
diff --git 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
index c7e19e943e6b..3098015dc7da 100644
--- 
a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
+++ 
b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala
@@ -246,7 +246,7 @@ class ShowCreateTableSuite extends 
v1.ShowCreateTableSuiteBase with CommandSuite
       table.copy(
         createTime = 0L,
         lastAccessTime = 0L,
-        properties = 
table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap,
+        properties = table.properties.filter { case (k, _) => 
!nondeterministicProps.contains(k) },
         stats = None,
         ignoredProperties = Map.empty,
         storage = table.storage.copy(properties = Map.empty),


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to