This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new ac0bd2eb7b40 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter` ac0bd2eb7b40 is described below commit ac0bd2eb7b4089096f9fb288482b2f1b5049b7e2 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Mon Dec 4 12:49:52 2023 -0800 [SPARK-46245][CORE][SQL][SS][YARN][K8S][UI] Replcace `s.c.MapOps.view.filterKeys` with `s.c.MapOps.filter` ### What changes were proposed in this pull request? This pr uses `s.c.MapOps.filter` to simplify code pattern `s.c.MapOps.view.filterKeys`. ### Why are the changes needed? The coding pattern of `s.c.MapOps.view.filterKeys` seems verbose, it can be simplified using `s.c.MapOps.filter`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44160 from LuciferYang/SPARK-46245. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/sql/kafka010/KafkaContinuousStream.scala | 2 +- .../scala/org/apache/spark/deploy/master/Master.scala | 3 +-- .../spark/deploy/rest/RestSubmissionClient.scala | 4 ++-- .../spark/executor/CoarseGrainedExecutorBackend.scala | 8 ++++---- .../org/apache/spark/resource/ResourceProfile.scala | 4 ++-- .../org/apache/spark/scheduler/DAGScheduler.scala | 4 +++- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- .../scheduler/cluster/StandaloneSchedulerBackend.scala | 4 ++-- .../spark/storage/ShuffleBlockFetcherIterator.scala | 2 +- .../main/scala/org/apache/spark/ui/PagedTable.scala | 7 +++---- .../org/apache/spark/HeartbeatReceiverSuite.scala | 2 +- .../scala/org/apache/spark/SparkThrowableSuite.scala | 5 ++--- .../spark/internal/plugin/PluginContainerSuite.scala | 2 +- .../scheduler/cluster/k8s/ExecutorPodsAllocator.scala | 6 ++++-- .../apache/spark/deploy/yarn/ExecutorRunnable.scala | 2 +- .../org/apache/spark/deploy/yarn/YarnAllocator.scala | 2 +- .../apache/spark/sql/catalyst/catalog/interface.scala | 8 +++----- .../catalyst/expressions/codegen/CodeGenerator.scala | 2 +- .../catalyst/plans/logical/basicLogicalOperators.scala | 5 +++-- .../scala/org/apache/spark/sql/DataFrameWriter.scala | 2 +- .../apache/spark/sql/execution/command/tables.scala | 7 ++++--- .../spark/sql/execution/datasources/DataSource.scala | 4 ++-- .../spark/sql/execution/datasources/FileFormat.scala | 2 +- .../sql/execution/datasources/jdbc/JDBCOptions.scala | 3 ++- .../sql/execution/datasources/v2/CacheTableExec.scala | 3 ++- .../execution/datasources/v2/DataSourceV2Utils.scala | 2 +- .../execution/datasources/v2/FileDataSourceV2.scala | 2 +- .../execution/datasources/v2/ShowCreateTableExec.scala | 18 ++++++++++-------- .../execution/datasources/v2/V2SessionCatalog.scala | 4 ++-- .../execution/streaming/state/RocksDBFileManager.scala | 6 +++--- .../apache/spark/sql/execution/ui/ExecutionPage.scala | 4 ++-- .../apache/spark/sql/streaming/DataStreamReader.scala | 2 +- .../apache/spark/sql/streaming/DataStreamWriter.scala | 2 +- .../apache/spark/sql/hive/HiveExternalCatalog.scala | 11 ++++++----- .../apache/spark/sql/hive/HiveMetastoreCatalog.scala | 7 +++---- .../apache/spark/sql/hive/execution/HiveOptions.scala | 6 +++--- .../spark/sql/hive/HiveSchemaInferenceSuite.scala | 2 +- .../org/apache/spark/sql/hive/StatisticsSuite.scala | 10 +++++----- .../apache/spark/sql/hive/execution/HiveDDLSuite.scala | 8 ++++---- .../hive/execution/command/ShowCreateTableSuite.scala | 2 +- 40 files changed, 93 insertions(+), 88 deletions(-) diff --git a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala index 026c4d560722..a86acd971a1c 100644 --- a/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala +++ b/connector/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousStream.scala @@ -102,7 +102,7 @@ class KafkaContinuousStream( } val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.view.filterKeys(!deletedPartitions.contains(_)) + oldStartPartitionOffsets.filter { case (k, _) => !deletedPartitions.contains(k) } knownPartitions = startOffsets.keySet startOffsets.toSeq.map { diff --git a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala index 0fe72e28ea5b..2e1d7b9bce33 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/Master.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/Master.scala @@ -958,8 +958,7 @@ private[deploy] class Master( private def decommissionWorkersOnHosts(hostnames: Seq[String]): Integer = { val hostnamesSet = hostnames.map(_.toLowerCase(Locale.ROOT)).toSet val workersToRemove = addressToWorker - .view - .filterKeys(addr => hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT))) + .filter { case (addr, _) => hostnamesSet.contains(addr.host.toLowerCase(Locale.ROOT)) } .values val workersToRemoveHostPorts = workersToRemove.map(_.hostPort) diff --git a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala index 286305bb76b8..f1400a0c6a74 100644 --- a/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala +++ b/core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala @@ -492,9 +492,9 @@ private[spark] object RestSubmissionClient { * Filter non-spark environment variables from any environment. */ private[rest] def filterSystemEnvironment(env: Map[String, String]): Map[String, String] = { - env.view.filterKeys { k => + env.filter { case (k, _) => k.startsWith("SPARK_") && !EXCLUDED_SPARK_ENV_VARS.contains(k) - }.toMap + } } private[spark] def supportsRestClient(master: String): Boolean = { diff --git a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala index 4bf4929c1339..366b481bf6a4 100644 --- a/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala @@ -157,14 +157,14 @@ private[spark] class CoarseGrainedExecutorBackend( def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" - sys.env.view.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap + sys.env.filter { case (k, _) => k.startsWith(prefix) } + .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) } def extractAttributes: Map[String, String] = { val prefix = "SPARK_EXECUTOR_ATTRIBUTE_" - sys.env.view.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)).toMap + sys.env.filter { case (k, _) => k.startsWith(prefix) } + .map(e => (e._1.substring(prefix.length).toUpperCase(Locale.ROOT), e._2)) } def notifyDriverAboutPushCompletion(shuffleId: Int, shuffleMergeId: Int, mapIndex: Int): Unit = { diff --git a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala index fdaa68b6931e..69c0672562c2 100644 --- a/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala +++ b/core/src/main/scala/org/apache/spark/resource/ResourceProfile.scala @@ -95,12 +95,12 @@ class ResourceProfile( } private[spark] def getCustomTaskResources(): Map[String, TaskResourceRequest] = { - taskResources.view.filterKeys(k => !k.equals(ResourceProfile.CPUS)).toMap + taskResources.filter { case (k, _) => !k.equals(ResourceProfile.CPUS) } } protected[spark] def getCustomExecutorResources(): Map[String, ExecutorResourceRequest] = { executorResources. - view.filterKeys(k => !ResourceProfile.allSupportedExecutorResources.contains(k)).toMap + filter { case (k, _) => !ResourceProfile.allSupportedExecutorResources.contains(k) } } /* diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index 241ef35cad73..1a51220cdf74 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -845,7 +845,9 @@ private[spark] class DAGScheduler( if (registeredStages.isEmpty || registeredStages.get.isEmpty) { logError("No stages registered for job " + job.jobId) } else { - stageIdToStage.view.filterKeys(stageId => registeredStages.get.contains(stageId)).foreach { + stageIdToStage.filter { + case (stageId, _) => registeredStages.get.contains(stageId) + }.foreach { case (stageId, stage) => val jobSet = stage.jobIds if (!jobSet.contains(job.jobId)) { diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index 4f1503f1d8c4..831fbd45edd7 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -373,7 +373,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp // Make sure no executor is killed while some task is launching on it val taskDescs = withLock { // Filter out executors under killing - val activeExecutors = executorDataMap.view.filterKeys(isExecutorActive) + val activeExecutors = executorDataMap.filter { case (id, _) => isExecutorActive(id) } val workOffers = activeExecutors.map { case (id, executorData) => buildWorkerOffer(id, executorData) }.toIndexedSeq diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala index 8da1a568cb4c..bd1cb164b4be 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/StandaloneSchedulerBackend.scala @@ -254,8 +254,8 @@ private[spark] class StandaloneSchedulerBackend( override def getDriverLogUrls: Option[Map[String, String]] = { val prefix = "SPARK_DRIVER_LOG_URL_" - val driverLogUrls = sys.env.view.filterKeys(_.startsWith(prefix)) - .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)).toMap + val driverLogUrls = sys.env.filter { case (k, _) => k.startsWith(prefix) } + .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) if (driverLogUrls.nonEmpty) Some(driverLogUrls) else None } diff --git a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala index 6144cc0e6a9a..4b59c1fdfa24 100644 --- a/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala +++ b/core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala @@ -653,7 +653,7 @@ final class ShuffleBlockFetcherIterator( hostLocalDirManager.getHostLocalDirs(host, port, bmIds.map(_.executorId)) { case Success(dirsByExecId) => fetchMultipleHostLocalBlocks( - hostLocalBlocksWithMissingDirs.view.filterKeys(bmIds.contains).toMap, + hostLocalBlocksWithMissingDirs.filter { case (k, _) => bmIds.contains(k) }, dirsByExecId, cached = false) diff --git a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala index 208b8189b4da..4fa9cd2c1e1d 100644 --- a/core/src/main/scala/org/apache/spark/ui/PagedTable.scala +++ b/core/src/main/scala/org/apache/spark/ui/PagedTable.scala @@ -207,10 +207,9 @@ private[spark] trait PagedTable[T] { .withKeyValueSeparator("=") .split(search) .asScala - .view - .filterKeys(_ != pageSizeFormField) - .filterKeys(_ != pageNumberFormField) - .mapValues(URLDecoder.decode(_, UTF_8.name())) + .filter { case (k, _) => k != pageSizeFormField} + .filter { case (k, _) => k != pageNumberFormField} + .map { case (k, v) => (k, URLDecoder.decode(v, UTF_8.name())) } .map { case (k, v) => <input type="hidden" name={k} value={v} /> } diff --git a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala index e3463a9dce09..c0a8f2ecdd49 100644 --- a/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala +++ b/core/src/test/scala/org/apache/spark/HeartbeatReceiverSuite.scala @@ -306,7 +306,7 @@ class HeartbeatReceiverSuite // We may receive undesired SparkListenerExecutorAdded from LocalSchedulerBackend, // so exclude it from the map. See SPARK-10800. heartbeatReceiver.invokePrivate(_executorLastSeen()). - view.filterKeys(_ != SparkContext.DRIVER_IDENTIFIER).toMap + filter { case (k, _) => k != SparkContext.DRIVER_IDENTIFIER } } } diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index c012613c2ee0..1ab50f867ff4 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -152,9 +152,8 @@ class SparkThrowableSuite extends SparkFunSuite { test("Message format invariants") { val messageFormats = errorReader.errorInfoMap - .view - .filterKeys(!_.startsWith("_LEGACY_ERROR_")) - .filterKeys(!_.startsWith("INTERNAL_ERROR")) + .filter { case (k, _) => !k.startsWith("_LEGACY_ERROR_") } + .filter { case (k, _) => !k.startsWith("INTERNAL_ERROR") } .values.toSeq.flatMap { i => Seq(i.messageTemplate) } checkCondition(messageFormats, s => s != null) checkIfUnique(messageFormats) diff --git a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala index 95b484d7176a..197c2f13d807 100644 --- a/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala +++ b/core/src/test/scala/org/apache/spark/internal/plugin/PluginContainerSuite.scala @@ -264,7 +264,7 @@ object NonLocalModeSparkPlugin { resources: Map[String, ResourceInformation]): String = { // try to keep this simple and only write the gpus addresses, if we add more resources need to // make more complex - val resourcesString = resources.view.filterKeys(_.equals(GPU)).map { + val resourcesString = resources.filter { case (k, _) => k.equals(GPU) }.map { case (_, ri) => s"${ri.addresses.mkString(",")}" }.mkString(",") diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index a4403fb96b21..236dfff9ac11 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -187,7 +187,8 @@ class ExecutorPodsAllocator( // to the schedulerKnownNewlyCreatedExecs val schedulerKnownExecs = schedulerBackend.getExecutorIds().map(_.toLong).toSet schedulerKnownNewlyCreatedExecs ++= - newlyCreatedExecutors.view.filterKeys(schedulerKnownExecs.contains(_)).mapValues(_._1) + newlyCreatedExecutors.filter { case (k, _) => schedulerKnownExecs.contains(k) } + .map { case (k, v) => (k, v._1) } newlyCreatedExecutors --= schedulerKnownNewlyCreatedExecs.keySet // For all executors we've created against the API but have not seen in a snapshot @@ -239,7 +240,8 @@ class ExecutorPodsAllocator( _deletedExecutorIds = _deletedExecutorIds.intersect(existingExecs) } - val notDeletedPods = lastSnapshot.executorPods.view.filterKeys(!_deletedExecutorIds.contains(_)) + val notDeletedPods = lastSnapshot.executorPods + .filter { case (k, _) => !_deletedExecutorIds.contains(k) } // Map the pods into per ResourceProfile id so we can check per ResourceProfile, // add a fast path if not using other ResourceProfiles. val rpIdToExecsAndPodState = diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala index 6a3fa50916b7..81b210a2297a 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ExecutorRunnable.scala @@ -205,7 +205,7 @@ private[yarn] class ExecutorRunnable( val env = new HashMap[String, String]() Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) - System.getenv().asScala.view.filterKeys(_.startsWith("SPARK")) + System.getenv().asScala.filter { case (k, _) => k.startsWith("SPARK") } .foreach { case (k, v) => env(k) = v } sparkConf.getExecutorEnv.foreach { case (key, value) => diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala index 5d24870bbcda..736eaa52b81c 100644 --- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala +++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala @@ -344,7 +344,7 @@ private[yarn] class YarnAllocator( val gpuResource = sparkConf.get(YARN_GPU_DEVICE) val fpgaResource = sparkConf.get(YARN_FPGA_DEVICE) getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++ - customSparkResources.view.filterKeys { r => + customSparkResources.filter { case (r, _) => (r == gpuResource || r == fpgaResource) } } else { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala index 20352129a06c..f716c2a0ccb6 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala @@ -461,7 +461,7 @@ case class CatalogTable( def toLinkedHashMap: mutable.LinkedHashMap[String, String] = { val map = new mutable.LinkedHashMap[String, String]() val tableProperties = - SQLConf.get.redactOptions(properties.view.filterKeys(!_.startsWith(VIEW_PREFIX)).toMap) + SQLConf.get.redactOptions(properties.filter { case (k, _) => !k.startsWith(VIEW_PREFIX) }) .toSeq.sortBy(_._1) .map(p => p._1 + "=" + p._2) val partitionColumns = partitionColumnNames.map(quoteIdentifier).mkString("[", ", ", "]") @@ -621,10 +621,8 @@ object CatalogTable { createTime = 0L, lastAccessTime = 0L, properties = table.properties - .view - .filterKeys(!nondeterministicProps.contains(_)) - .map(identity) - .toMap, + .filter { case (k, _) => !nondeterministicProps.contains(k) } + .map(identity), stats = None, ignoredProperties = Map.empty ) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala index 0340ac376563..e73b00600764 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala @@ -544,7 +544,7 @@ class CodegenContext extends Logging { s"private $className $classInstance = new $className();" } - val declareNestedClasses = classFunctions.view.filterKeys(_ != outerClassName).map { + val declareNestedClasses = classFunctions.filter { case (k, _) => k != outerClassName }.map { case (className, functions) => s""" |private class $className { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala index 9bd0f58e3df8..c66ead30ab3c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala @@ -778,13 +778,14 @@ object View { // as optimization configs but they are still needed during the view resolution. // TODO: remove this `retainedConfigs` after the `RelationConversions` is moved to // optimization phase. - val retainedConfigs = activeConf.getAllConfs.view.filterKeys(key => + val retainedConfigs = activeConf.getAllConfs.filter { case (key, _) => Seq( "spark.sql.hive.convertMetastoreParquet", "spark.sql.hive.convertMetastoreOrc", "spark.sql.hive.convertInsertingPartitionedTable", "spark.sql.hive.convertMetastoreCtas" - ).contains(key) || key.startsWith("spark.sql.catalog.")) + ).contains(key) || key.startsWith("spark.sql.catalog.") + } for ((k, v) <- configs ++ retainedConfigs) { sqlConf.settings.put(k, v) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 5a7ac49bd4e7..c8727146160b 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -263,7 +263,7 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { val optionsWithPath = getOptionsWithPath(path) - val finalOptions = sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++ + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ optionsWithPath.originalMap val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala index 7ed82b16cc5e..2f8fca7cfd73 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala @@ -675,7 +675,7 @@ case class DescribeTableCommand( ) append(buffer, "", "", "") append(buffer, "# Detailed Table Information", "", "") - table.toLinkedHashMap.view.filterKeys(!excludedTableInfo.contains(_)).foreach { + table.toLinkedHashMap.filter { case (k, _) => !excludedTableInfo.contains(k) }.foreach { s => append(buffer, s._1, s._2, "") } } @@ -956,7 +956,7 @@ case class ShowTablePropertiesCommand( Seq(Row(p, propValue)) } case None => - properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX)) + properties.filter { case (k, _) => !k.startsWith(CatalogTable.VIEW_PREFIX) } .toSeq.sortBy(_._1).map(p => Row(p._1, p._2)) } } @@ -1104,7 +1104,8 @@ trait ShowCreateTableCommandBase extends SQLConfHelper { } private def showViewProperties(metadata: CatalogTable, builder: StringBuilder): Unit = { - val viewProps = metadata.properties.view.filterKeys(!_.startsWith(CatalogTable.VIEW_PREFIX)) + val viewProps = metadata.properties + .filter { case (k, _) => !k.startsWith(CatalogTable.VIEW_PREFIX) } if (viewProps.nonEmpty) { val props = viewProps.toSeq.sortBy(_._1).map { case (key, value) => s"'${escapeSingleQuotedString(key)}' = '${escapeSingleQuotedString(value)}'" diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala index cebc74af724d..668d2538e03f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSource.scala @@ -806,9 +806,9 @@ object DataSource extends Logging { */ def buildStorageFormatFromOptions(options: Map[String, String]): CatalogStorageFormat = { val path = CaseInsensitiveMap(options).get("path") - val optionsWithoutPath = options.view.filterKeys(_.toLowerCase(Locale.ROOT) != "path") + val optionsWithoutPath = options.filter { case (k, _) => k.toLowerCase(Locale.ROOT) != "path" } CatalogStorageFormat.empty.copy( - locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath.toMap) + locationUri = path.map(CatalogUtils.stringToURI), properties = optionsWithoutPath) } /** diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala index babe3e88d585..36c59950fe20 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala @@ -311,7 +311,7 @@ object FileFormat { // file split information yet, nor do we have a way to provide custom metadata column values. val validFieldNames = Set(FILE_PATH, FILE_NAME, FILE_SIZE, FILE_MODIFICATION_TIME) val extractors = - FileFormat.BASE_METADATA_EXTRACTORS.view.filterKeys(validFieldNames.contains).toMap + FileFormat.BASE_METADATA_EXTRACTORS.filter { case (k, _) => validFieldNames.contains(k) } assert(fieldNames.forall(validFieldNames.contains)) val pf = PartitionedFile( partitionValues = partitionValues, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala index daa282fda6ab..28fa7b8bf561 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JDBCOptions.scala @@ -62,7 +62,8 @@ class JDBCOptions( */ val asConnectionProperties: Properties = { val properties = new Properties() - parameters.originalMap.view.filterKeys(key => !jdbcOptionNames(key.toLowerCase(Locale.ROOT))) + parameters.originalMap + .filter { case (key, _) => !jdbcOptionNames(key.toLowerCase(Locale.ROOT)) } .foreach { case (k, v) => properties.setProperty(k, v) } properties } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala index b540ab6a4c98..28241fb0a67a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/CacheTableExec.scala @@ -41,7 +41,8 @@ trait BaseCacheTableExec extends LeafV2CommandExec { val storageLevel = CaseInsensitiveMap(options).get(storageLevelKey) .map(s => StorageLevel.fromString(s.toUpperCase(Locale.ROOT))) .getOrElse(conf.defaultCacheStorageLevel) - val withoutStorageLevel = options.view.filterKeys(_.toLowerCase(Locale.ROOT) != storageLevelKey) + val withoutStorageLevel = options + .filter { case (k, _) => k.toLowerCase(Locale.ROOT) != storageLevelKey } if (withoutStorageLevel.nonEmpty) { logWarning(s"Invalid options: ${withoutStorageLevel.mkString(", ")}") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala index 3dde20ac44e7..ef8e4605f472 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Utils.scala @@ -106,7 +106,7 @@ private[sql] object DataSourceV2Utils extends Logging { val optionsWithPath = getOptionsWithPaths(extraOptions, paths: _*) - val finalOptions = sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++ + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ optionsWithPath.originalMap val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val (table, catalog, ident) = provider match { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala index 03c6589f8dcc..fe75b17af025 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/FileDataSourceV2.scala @@ -58,7 +58,7 @@ trait FileDataSourceV2 extends TableProvider with DataSourceRegister { } protected def getOptionsWithoutPaths(map: CaseInsensitiveStringMap): CaseInsensitiveStringMap = { - val withoutPath = map.asCaseSensitiveMap().asScala.view.filterKeys { k => + val withoutPath = map.asCaseSensitiveMap().asScala.filter { case (k, _) => !k.equalsIgnoreCase("path") && !k.equalsIgnoreCase("paths") } new CaseInsensitiveStringMap(withoutPath.toMap.asJava) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala index facfa5674726..102214e36c91 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/ShowCreateTableExec.scala @@ -50,10 +50,10 @@ case class ShowCreateTableExec( showTableDataColumns(table, builder) showTableUsing(table, builder) - val tableOptions = table.properties.asScala.view - .filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { - case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v - }.toMap + val tableOptions = table.properties.asScala + .filter { case (k, _) => k.startsWith(TableCatalog.OPTION_PREFIX) }.map { + case (k, v) => k.drop(TableCatalog.OPTION_PREFIX.length) -> v + }.toMap showTableOptions(builder, tableOptions) showTablePartitioning(table, builder) showTableComment(table, builder) @@ -132,10 +132,12 @@ case class ShowCreateTableExec( builder: StringBuilder, tableOptions: Map[String, String]): Unit = { - val showProps = table.properties.asScala.view - .filterKeys(key => !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key) - && !key.startsWith(TableCatalog.OPTION_PREFIX) - && !tableOptions.contains(key)) + val showProps = table.properties.asScala + .filter { case (key, _) => + !CatalogV2Util.TABLE_RESERVED_PROPERTIES.contains(key) && + !key.startsWith(TableCatalog.OPTION_PREFIX) && + !tableOptions.contains(key) + } if (showProps.nonEmpty) { val props = conf.redactOptions(showProps.toMap).toSeq.sortBy(_._1).map { case (key, value) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala index 6cd7ec403be3..5a8f2e2d8a40 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala @@ -154,9 +154,9 @@ class V2SessionCatalog(catalog: SessionCatalog) } private def toOptions(properties: Map[String, String]): Map[String, String] = { - properties.view.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map { + properties.filter { case (k, _) => k.startsWith(TableCatalog.OPTION_PREFIX) }.map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value - }.toMap + } } override def alterTable( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala index e6d9b6f8d96a..5e2b6afee68e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDBFileManager.scala @@ -459,10 +459,10 @@ class RocksDBFileManager( // Get the immutable files used in previous versions, as some of those uploaded files can be // reused for this version logInfo(s"Saving RocksDB files to DFS for $version") - val prevFilesToSizes = versionToRocksDBFiles.asScala.view.filterKeys(_ < version) + val prevFilesToSizes = versionToRocksDBFiles.asScala.filter { case (k, _) => k < version } .values.flatten.map { f => - f.localFileName -> f - }.toMap + f.localFileName -> f + }.toMap var bytesCopied = 0L var filesCopied = 0L diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala index 2a3028155a24..cbec837762b7 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/ExecutionPage.scala @@ -85,9 +85,9 @@ class ExecutionPage(parent: SQLTab) extends WebUIPage("execution") with Logging summary ++ planVisualization(request, metrics, graph) ++ physicalPlanDescription(executionUIData.physicalPlanDescription) ++ - modifiedConfigs(configs.view.filterKeys(!_.startsWith(pandasOnSparkConfPrefix)).toMap) ++ + modifiedConfigs(configs.filter { case (k, _) => !k.startsWith(pandasOnSparkConfPrefix) }) ++ modifiedPandasOnSparkConfigs( - configs.view.filterKeys(_.startsWith(pandasOnSparkConfPrefix)).toMap) + configs.filter { case (k, _) => k.startsWith(pandasOnSparkConfPrefix) }) }.getOrElse { <div>No information to display for query {executionId}</div> } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala index 905c96ff4cbb..1a69678c2f54 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamReader.scala @@ -175,7 +175,7 @@ final class DataStreamReader private[sql](sparkSession: SparkSession) extends Lo case provider: TableProvider if !provider.isInstanceOf[FileDataSourceV2] => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) - val finalOptions = sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++ + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ optionsWithPath.originalMap val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) val table = DataSourceV2Utils.getTableFromProvider(provider, dsOptions, userSpecifiedSchema) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala index 59bc3f5d08a4..95aa2f8c7a4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/streaming/DataStreamWriter.scala @@ -385,7 +385,7 @@ final class DataStreamWriter[T] private[sql](ds: Dataset[T]) { val provider = cls.getConstructor().newInstance().asInstanceOf[TableProvider] val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = df.sparkSession.sessionState.conf) - val finalOptions = sessionOptions.view.filterKeys(!optionsWithPath.contains(_)).toMap ++ + val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++ optionsWithPath.originalMap val dsOptions = new CaseInsensitiveStringMap(finalOptions.asJava) // If the source accepts external table metadata, here we pass the schema of input query diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 32526f1d18f5..c5adb8e27c4c 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -870,7 +870,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat locationUri = tableLocation.map(CatalogUtils.stringToURI(_))) } val storageWithoutHiveGeneratedProperties = storageWithLocation.copy(properties = - storageWithLocation.properties.view.filterKeys(!HIVE_GENERATED_STORAGE_PROPERTIES(_)).toMap) + storageWithLocation.properties.filter { case (k, _) => !HIVE_GENERATED_STORAGE_PROPERTIES(k) } + ) val partitionProvider = table.properties.get(TABLE_PARTITION_PROVIDER) val schemaFromTableProps = @@ -885,7 +886,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat partitionColumnNames = partColumnNames, bucketSpec = getBucketSpecFromTableProperties(table), tracksPartitionsInCatalog = partitionProvider == Some(TABLE_PARTITION_PROVIDER_CATALOG), - properties = table.properties.view.filterKeys(!HIVE_GENERATED_TABLE_PROPERTIES(_)).toMap) + properties = table.properties.filter { case (k, _) => !HIVE_GENERATED_TABLE_PROPERTIES(k) }) } override def tableExists(db: String, table: String): Boolean = withClient { @@ -1160,15 +1161,15 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat properties: Map[String, String], table: String): Option[CatalogStatistics] = { - val statsProps = properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX)) + val statsProps = properties.filter { case (k, _) => k.startsWith(STATISTICS_PREFIX) } if (statsProps.isEmpty) { None } else { val colStats = new mutable.HashMap[String, CatalogColumnStat] val colStatsProps = - properties.view.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map { + properties.filter { case (k, _) => k.startsWith(STATISTICS_COL_STATS_PREFIX) }.map { case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v - }.toMap + } // Find all the column names by matching the KEY_VERSION properties for them. colStatsProps.keys.filter { diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala index e7d03b82274c..c9446b378297 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala @@ -133,12 +133,12 @@ private[hive] class HiveMetastoreCatalog(sparkSession: SparkSession) extends Log // Consider table and storage properties. For properties existing in both sides, storage // properties will supersede table properties. if (serde.contains("parquet")) { - val options = relation.tableMeta.properties.view.filterKeys(isParquetProperty).toMap ++ + val options = relation.tableMeta.properties.filter { case (k, _) => isParquetProperty(k) } ++ relation.tableMeta.storage.properties + (ParquetOptions.MERGE_SCHEMA -> SQLConf.get.getConf(HiveUtils.CONVERT_METASTORE_PARQUET_WITH_SCHEMA_MERGING).toString) convertToLogicalRelation(relation, options, classOf[ParquetFileFormat], "parquet", isWrite) } else { - val options = relation.tableMeta.properties.view.filterKeys(isOrcProperty).toMap ++ + val options = relation.tableMeta.properties.filter { case (k, _) => isOrcProperty(k) } ++ relation.tableMeta.storage.properties if (SQLConf.get.getConf(SQLConf.ORC_IMPLEMENTATION) == "native") { convertToLogicalRelation( @@ -377,8 +377,7 @@ private[hive] object HiveMetastoreCatalog { // Find any nullable fields in metastore schema that are missing from the inferred schema. val metastoreFields = metastoreSchema.map(f => f.name.toLowerCase -> f).toMap val missingNullables = metastoreFields - .view - .filterKeys(!inferredSchema.map(_.name.toLowerCase).contains(_)) + .filter { case (k, _) => !inferredSchema.map(_.name.toLowerCase).contains(k) } .values .filter(_.nullable) // Merge missing nullable fields to inferred schema and build a case-insensitive field map. diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala index 44f0e43d11e5..0f2eee6798ea 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/execution/HiveOptions.scala @@ -85,9 +85,9 @@ class HiveOptions(@transient private val parameters: CaseInsensitiveMap[String]) s"line delimiter, but given: $lineDelim.") } - def serdeProperties: Map[String, String] = parameters.view.filterKeys { - k => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT)) - }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v }.toMap + def serdeProperties: Map[String, String] = parameters.filter { + case (k, _) => !lowerCasedOptionNames.contains(k.toLowerCase(Locale.ROOT)) + }.map { case (k, v) => delimiterOptions.getOrElse(k, k) -> v } } object HiveOptions { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala index 8ff209d1580b..1eb3f6f3c9cb 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/HiveSchemaInferenceSuite.scala @@ -124,7 +124,7 @@ class HiveSchemaInferenceSuite // properties out). assert(!externalCatalog.getTable(DATABASE, TEST_TABLE_NAME).schemaPreservesCase) val rawTable = client.getTable(DATABASE, TEST_TABLE_NAME) - assert(rawTable.properties.view.filterKeys(_.startsWith(DATASOURCE_SCHEMA_PREFIX)).isEmpty) + assert(!rawTable.properties.exists { case (k, _) => k.startsWith(DATASOURCE_SCHEMA_PREFIX) }) // Add partition records (if specified) if (!partitionCols.isEmpty) { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala index 21a115486298..5502414629c0 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala @@ -970,7 +970,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto */ private def getStatsProperties(tableName: String): Map[String, String] = { val hTable = hiveClient.getTable(spark.sessionState.catalog.getCurrentDatabase, tableName) - hTable.properties.view.filterKeys(_.startsWith(STATISTICS_PREFIX)).toMap + hTable.properties.filter { case (k, _) => k.startsWith(STATISTICS_PREFIX) } } test("change stats after insert command for hive table") { @@ -1209,7 +1209,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + stats.keys.mkString(", ")) val table = hiveClient.getTable("default", tableName) val props = - table.properties.view.filterKeys(_.startsWith("spark.sql.statistics.colStats")).toMap + table.properties.filter { case (k, _) => k.startsWith("spark.sql.statistics.colStats") } assert(props == expected) } @@ -1278,11 +1278,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS cint, ctimestamp") val table = hiveClient.getTable("default", tableName) val intHistogramProps = table.properties - .view.filterKeys(_.startsWith("spark.sql.statistics.colStats.cint.histogram")) + .filter { case (k, _) => k.startsWith("spark.sql.statistics.colStats.cint.histogram") } assert(intHistogramProps.size == 1) - val tsHistogramProps = table.properties - .view.filterKeys(_.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram")) + val tsHistogramProps = table.properties.filter { + case (k, _) => k.startsWith("spark.sql.statistics.colStats.ctimestamp.histogram") } assert(tsHistogramProps.size == 1) // Validate histogram after deserialization. diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala index 2f5d1fcbb540..cf2098641ad1 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/HiveDDLSuite.scala @@ -133,7 +133,7 @@ class HiveDDLSuite createTime = 0L, lastAccessTime = 0L, owner = "", - properties = table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap, + properties = table.properties.filter { case (k, _) => !nondeterministicProps.contains(k) }, // View texts are checked separately viewText = None ) @@ -1089,7 +1089,7 @@ class HiveDDLSuite expectedSerdeProps.map { case (k, v) => s"'$k'='$v'" }.mkString(", ") val oldPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) assert(oldPart.storage.serde != Some(expectedSerde), "bad test: serde was already set") - assert(oldPart.storage.properties.view.filterKeys(expectedSerdeProps.contains) != + assert(oldPart.storage.properties.filter { case (k, _) => expectedSerdeProps.contains(k) } != expectedSerdeProps, "bad test: serde properties were already set") sql(s"""ALTER TABLE boxes PARTITION (width=4) | SET SERDE '$expectedSerde' @@ -1097,7 +1097,7 @@ class HiveDDLSuite |""".stripMargin) val newPart = catalog.getPartition(TableIdentifier("boxes"), Map("width" -> "4")) assert(newPart.storage.serde == Some(expectedSerde)) - assert(newPart.storage.properties.view.filterKeys(expectedSerdeProps.contains).toMap == + assert(newPart.storage.properties.filter { case (k, _) => expectedSerdeProps.contains(k) } == expectedSerdeProps) } @@ -1699,7 +1699,7 @@ class HiveDDLSuite "minFileSize" ) assert( - targetTable.properties.view.filterKeys(!metastoreGeneratedProperties.contains(_)).isEmpty, + targetTable.properties.forall { case (k, _) => metastoreGeneratedProperties.contains(k) }, "the table properties of source tables should not be copied in the created table") provider match { diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala index c7e19e943e6b..3098015dc7da 100644 --- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala +++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/command/ShowCreateTableSuite.scala @@ -246,7 +246,7 @@ class ShowCreateTableSuite extends v1.ShowCreateTableSuiteBase with CommandSuite table.copy( createTime = 0L, lastAccessTime = 0L, - properties = table.properties.view.filterKeys(!nondeterministicProps.contains(_)).toMap, + properties = table.properties.filter { case (k, _) => !nondeterministicProps.contains(k) }, stats = None, ignoredProperties = Map.empty, storage = table.storage.copy(properties = Map.empty), --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org