This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 938b7f58051 
[SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues`
938b7f58051 is described below

commit 938b7f580519e3da64004185f7083ae63cf99bc0
Author: yangjie01 <yangji...@baidu.com>
AuthorDate: Sat Dec 2 21:39:14 2023 -0800

    
[SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES]
 Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues`
    
    ### What changes were proposed in this pull request?
    This pr simplifies `s.c.MapOps.view.mapValues` using the following approach:
    
    - For the `s.c.immutable.MapOps` type, replace it with the 
`s.c.immutable.MapOps#transform` function.
    
    ```scala
    def transform[W](f: (K, V) => W): CC[K, W] = map { case (k, v) => (k, f(k, 
v)) }
    ```
    
    Like the case in `CountMinSketchSuite`:
    
    
https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala#L59
    
    - For the `s.c.MapOps` type, since the `transform` function does not exist 
for this type, replace it directly with the `map` function.
    
    ```scala
    def map[K2, V2](f: ((K, V)) => (K2, V2)): CC[K2, V2] = mapFactory.from(new 
View.Map(this, f))
    ```
    
    Like the case in `KafkaTestUtils`:
    
    
https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L381
    
    - For the `s.c.mutable.MapOps` type, the `transform` function has also been 
deprecated. At the same time, the signature of `transform` and its replacement 
function `mapValuesInPlace` is as follows:
    
    ```scala
    
      deprecated("Use mapValuesInPlace instead", "2.13.0")
      inline final def transform(f: (K, V) => V): this.type = 
mapValuesInPlace(f)
    
      def mapValuesInPlace(f: (K, V) => V): this.type = {...}
    ```
    
    The target type of the value in the function is `V`, which is different 
from the target type of the value in `s.c.immutable.MapOps#transform`, which is 
`W`. This does not meet the desired requirement. So in this scenario, it can be 
divided into two sub-scenarios for handling:
    
    1. If the `mutable.Map` are using needs to be eventually converted to an 
`immutable.Map`, first convert it to an `immutable.Map` and then use the 
`transform` function for replacement. Like the case in `SparkConnectPlanner`:
    
    
https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L292
    
    2. If the `mutable.Map` are using does not need to be converted to an 
`immutable.Map` in the end, directly use the `map` function from 
`scala.collection.MapOps` for replacement. Like the case in `SparkSession`:
    
    
https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala#L313
    
    In addition, there is a special case in `PythonWorkerFactory`:
    
    
https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L381
    
    For this case, it only needs to `destroy` each `Process` in `values` 
without returning any value. Therefore, it has been rewritten using 
`.values.foreach`.
    
    ### Why are the changes needed?
    The coding pattern of `s.c.MapOps.view.mapValues` seems verbose, it can be 
simplified using other functions.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    Pass GitHub Actions
    
    ### Was this patch authored or co-authored using generative AI tooling?
    No
    
    Closes #44122 from LuciferYang/SPARK-46212.
    
    Authored-by: yangjie01 <yangji...@baidu.com>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../spark/util/sketch/CountMinSketchSuite.scala    |  2 +-
 .../org/apache/spark/sql/avro/AvroUtils.scala      |  3 +--
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../spark/sql/ClientDataFrameStatSuite.scala       |  2 +-
 .../org/apache/spark/sql/connect/dsl/package.scala |  2 +-
 .../sql/connect/planner/SparkConnectPlanner.scala  | 15 ++++++-----
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  |  2 +-
 .../apache/spark/sql/kafka010/KafkaTestUtils.scala |  3 ++-
 .../streaming/kafka010/ConsumerStrategy.scala      |  9 ++++---
 .../kafka010/DirectKafkaInputDStream.scala         |  2 +-
 .../kafka010/DirectKafkaStreamSuite.scala          |  4 +--
 .../spark/streaming/kafka010/KafkaTestUtils.scala  |  2 +-
 .../spark/streaming/kinesis/KinesisTestUtils.scala |  2 +-
 .../kinesis/KPLBasedKinesisTestUtils.scala         |  2 +-
 .../kinesis/KinesisBackedBlockRDDSuite.scala       |  4 +--
 .../spark/sql/protobuf/utils/ProtobufUtils.scala   |  3 +--
 .../org/apache/spark/api/java/JavaPairRDD.scala    |  6 +++--
 .../apache/spark/api/java/JavaSparkContext.scala   |  2 +-
 .../spark/api/python/PythonWorkerFactory.scala     |  2 +-
 .../apache/spark/scheduler/InputFormatInfo.scala   |  2 +-
 .../apache/spark/scheduler/TaskSchedulerImpl.scala |  2 +-
 .../cluster/CoarseGrainedSchedulerBackend.scala    |  2 +-
 ...plicationEnvironmentInfoWrapperSerializer.scala | 10 ++++----
 .../ExecutorSummaryWrapperSerializer.scala         |  2 +-
 .../status/protobuf/JobDataWrapperSerializer.scala |  2 +-
 .../protobuf/StageDataWrapperSerializer.scala      |  9 ++++---
 .../org/apache/spark/SparkThrowableSuite.scala     |  2 +-
 .../apache/spark/rdd/PairRDDFunctionsSuite.scala   |  5 ++--
 .../test/scala/org/apache/spark/rdd/RDDSuite.scala |  3 +--
 .../scheduler/ExecutorResourceInfoSuite.scala      |  3 +--
 .../BlockManagerDecommissionIntegrationSuite.scala |  2 +-
 .../storage/ShuffleBlockFetcherIteratorSuite.scala |  2 +-
 .../util/collection/ExternalSorterSuite.scala      |  2 +-
 .../apache/spark/examples/DFSReadWriteTest.scala   |  3 +--
 .../apache/spark/examples/MiniReadWriteTest.scala  |  3 +--
 .../mllib/PowerIterationClusteringExample.scala    |  2 +-
 .../spark/graphx/lib/ShortestPathsSuite.scala      |  2 +-
 .../spark/ml/evaluation/ClusteringMetrics.scala    |  7 +++--
 .../apache/spark/ml/feature/VectorIndexer.scala    |  3 ++-
 .../org/apache/spark/ml/feature/Word2Vec.scala     |  2 +-
 .../apache/spark/ml/tree/impl/RandomForest.scala   |  4 +--
 .../spark/mllib/clustering/BisectingKMeans.scala   |  2 +-
 .../mllib/linalg/distributed/BlockMatrix.scala     |  4 +--
 .../apache/spark/mllib/stat/test/ChiSqTest.scala   |  3 +--
 .../apache/spark/ml/recommendation/ALSSuite.scala  |  8 +++---
 .../apache/spark/mllib/feature/Word2VecSuite.scala | 12 ++++-----
 .../org/apache/spark/sql/types/Metadata.scala      |  2 +-
 .../spark/sql/catalyst/analysis/Analyzer.scala     |  5 ++--
 .../spark/sql/catalyst/analysis/unresolved.scala   |  2 +-
 .../catalyst/catalog/ExternalCatalogUtils.scala    |  2 +-
 .../sql/catalyst/catalog/SessionCatalog.scala      |  2 +-
 .../spark/sql/catalyst/expressions/package.scala   |  2 +-
 .../catalyst/optimizer/NestedColumnAliasing.scala  |  2 +-
 .../spark/sql/catalyst/optimizer/Optimizer.scala   |  3 +--
 .../spark/sql/catalyst/parser/AstBuilder.scala     |  8 +++---
 .../apache/spark/sql/catalyst/trees/TreeNode.scala | 11 ++++----
 .../spark/sql/catalyst/util/ToNumberParser.scala   |  2 +-
 .../spark/sql/errors/QueryCompilationErrors.scala  |  5 ++--
 .../scala/org/apache/spark/sql/SparkSession.scala  |  2 +-
 .../command/AnalyzePartitionCommand.scala          |  2 +-
 .../execution/datasources/PartitioningUtils.scala  |  2 +-
 .../execution/datasources/orc/OrcFiltersBase.scala |  3 +--
 .../datasources/parquet/ParquetFilters.scala       |  5 ++--
 .../execution/exchange/EnsureRequirements.scala    |  3 +--
 .../sql/execution/streaming/ProgressReporter.scala | 14 +++++-----
 .../sql/execution/streaming/state/RocksDB.scala    |  4 +--
 .../execution/streaming/statefulOperators.scala    |  2 +-
 .../spark/sql/execution/ui/AllExecutionsPage.scala |  6 ++---
 .../org/apache/spark/sql/DataFrameStatSuite.scala  |  2 +-
 .../org/apache/spark/sql/SQLQueryTestSuite.scala   |  2 +-
 .../test/scala/org/apache/spark/sql/UDFSuite.scala |  2 +-
 .../spark/sql/execution/SQLViewTestSuite.scala     |  2 +-
 .../command/AlterTableDropPartitionSuiteBase.scala |  3 ++-
 .../datasources/parquet/ParquetIOSuite.scala       |  2 +-
 .../sql/execution/metric/SQLMetricsTestUtils.scala |  8 +++---
 .../execution/ui/SQLAppStatusListenerSuite.scala   | 30 +++++++++++-----------
 .../StreamingQueryStatusAndProgressSuite.scala     |  6 ++---
 .../sql/streaming/continuous/ContinuousSuite.scala |  2 +-
 .../spark/sql/hive/HiveExternalCatalog.scala       |  3 ++-
 .../api/java/JavaStreamingListenerWrapper.scala    |  4 +--
 .../scheduler/ReceiverSchedulingPolicy.scala       |  2 +-
 .../streaming/scheduler/ReceiverTracker.scala      |  6 ++---
 .../apache/spark/streaming/ui/BatchUIData.scala    |  3 ++-
 .../ui/StreamingJobProgressListener.scala          |  4 +--
 84 files changed, 170 insertions(+), 172 deletions(-)

diff --git 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
index 689452caa32..3d89eaf4daf 100644
--- 
a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
+++ 
b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala
@@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { // 
scalastyle:ignore funsuite
 
       val exactFreq = {
         val sampledItems = sampledItemIndices.map(allItems)
-        sampledItems.groupBy(identity).view.mapValues(_.length.toLong)
+        sampledItems.groupBy(identity).transform((_, v) => v.length.toLong)
       }
 
       val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed)
diff --git 
a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala 
b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
index e235c13d413..27a5b918fc9 100644
--- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
+++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala
@@ -244,8 +244,7 @@ private[sql] object AvroUtils extends Logging {
     private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray
     private[this] val fieldMap = avroSchema.getFields.asScala
       .groupBy(_.name.toLowerCase(Locale.ROOT))
-      .view
-      .mapValues(_.toSeq) // toSeq needed for scala 2.13
+      .transform((_, v) => v.toSeq) // toSeq needed for scala 2.13
 
     /** The fields which have matching equivalents in both Avro and Catalyst 
schemas. */
     val matchedFields: Seq[AvroMatchedField] = 
catalystSchema.zipWithIndex.flatMap {
diff --git 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
index ca692d2d4f8..daa172e215a 100644
--- 
a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ 
b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -310,7 +310,7 @@ class SparkSession private[sql] (
           proto.SqlCommand
             .newBuilder()
             .setSql(sqlText)
-            
.putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava)))
+            .putAllNamedArguments(args.asScala.map { case (k, v) => (k, 
lit(v).expr) }.asJava)))
       val plan = proto.Plan.newBuilder().setCommand(cmd)
       val responseIter = client.execute(plan.build())
 
diff --git 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
index 747ca45d10d..d0a89f672f7 100644
--- 
a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
+++ 
b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala
@@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession {
     val columnNames = crosstab.schema.fieldNames
     assert(columnNames(0) === "a_b")
     // reduce by key
-    val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length)
+    val expected = data.map(t => (t, 1)).groupBy(_._1).transform((_, v) => 
v.length)
     val rows = crosstab.collect()
     rows.foreach { row =>
       val i = row.getString(0).toInt
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
index ccdae26bd35..6aadb6c34b7 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala
@@ -362,7 +362,7 @@ package object dsl {
       }
 
       def fillValueMap(valueMap: Map[String, Any]): Relation = {
-        val (cols, values) = 
valueMap.view.mapValues(toLiteralProto).toSeq.unzip
+        val (cols, values) = valueMap.transform((_, v) => 
toLiteralProto(v)).toSeq.unzip
         Relation
           .newBuilder()
           .setFillNa(
diff --git 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
index 4f8b7c00888..e2b4a3c782e 100644
--- 
a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
+++ 
b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala
@@ -289,11 +289,13 @@ class SparkConnectPlanner(
     if (!namedArguments.isEmpty) {
       NameParameterizedQuery(
         parsedPlan,
-        namedArguments.asScala.view.mapValues(transformExpression).toMap)
+        namedArguments.asScala.toMap.transform((_, v) => 
transformExpression(v)))
     } else if (!posArguments.isEmpty) {
       PosParameterizedQuery(parsedPlan, 
posArguments.asScala.map(transformExpression).toSeq)
     } else if (!args.isEmpty) {
-      NameParameterizedQuery(parsedPlan, 
args.asScala.view.mapValues(transformLiteral).toMap)
+      NameParameterizedQuery(
+        parsedPlan,
+        args.asScala.toMap.transform((_, v) => transformLiteral(v)))
     } else if (!posArgs.isEmpty) {
       PosParameterizedQuery(parsedPlan, 
posArgs.asScala.map(transformLiteral).toSeq)
     } else {
@@ -2535,7 +2537,7 @@ class SparkConnectPlanner(
     val df = if (!namedArguments.isEmpty) {
       session.sql(
         getSqlCommand.getSql,
-        namedArguments.asScala.view.mapValues(e => 
Column(transformExpression(e))).toMap,
+        namedArguments.asScala.toMap.transform((_, e) => 
Column(transformExpression(e))),
         tracker)
     } else if (!posArguments.isEmpty) {
       session.sql(
@@ -2545,7 +2547,7 @@ class SparkConnectPlanner(
     } else if (!args.isEmpty) {
       session.sql(
         getSqlCommand.getSql,
-        args.asScala.view.mapValues(transformLiteral).toMap,
+        args.asScala.toMap.transform((_, v) => transformLiteral(v)),
         tracker)
     } else if (!posArgs.isEmpty) {
       session.sql(getSqlCommand.getSql, 
posArgs.asScala.map(transformLiteral).toArray, tracker)
@@ -3298,14 +3300,13 @@ class SparkConnectPlanner(
           proto.GetResourcesCommandResult
             .newBuilder()
             .putAllResources(
-              session.sparkContext.resources.view
-                .mapValues(resource =>
+              session.sparkContext.resources.toMap
+                .transform((_, resource) =>
                   proto.ResourceInformation
                     .newBuilder()
                     .setName(resource.name)
                     
.addAllAddresses(resource.addresses.toImmutableArraySeq.asJava)
                     .build())
-                .toMap
                 .asJava)
             .build())
         .build())
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 27904ebb3c5..02e4e909734 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -2294,7 +2294,7 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
       Execute { q =>
         // wait to reach the last offset in every partition
         q.awaitOffset(0,
-          KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap),
+          KafkaSourceOffset(partitionOffsets.transform((_, _) => 3L)),
           streamingTimeout.toMillis)
       },
       CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22),
diff --git 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
index d7e04962925..1fa1dda9faf 100644
--- 
a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
@@ -378,7 +378,8 @@ class KafkaTestUtils(
   }
 
   def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = {
-    
zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq
+    zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster())
+      .map { case (k, v) => (k, v.size) }.toSeq
   }
 
   /** Create a Kafka topic and wait until it is propagated to the whole 
cluster */
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
index 1dd66675b91..a0b0e92666e 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala
@@ -241,7 +241,8 @@ object ConsumerStrategies {
     new Subscribe[K, V](
       new ju.ArrayList(topics.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, jl.Long](
+        offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava))
   }
 
   /**
@@ -320,7 +321,8 @@ object ConsumerStrategies {
     new SubscribePattern[K, V](
       pattern,
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, jl.Long](
+        offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava))
   }
 
   /**
@@ -404,7 +406,8 @@ object ConsumerStrategies {
     new Assign[K, V](
       new ju.ArrayList(topicPartitions.asJavaCollection),
       new ju.HashMap[String, Object](kafkaParams.asJava),
-      new ju.HashMap[TopicPartition, 
jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava))
+      new ju.HashMap[TopicPartition, jl.Long](
+        offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava))
   }
 
   /**
diff --git 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
index c412486ce19..d3795a194dd 100644
--- 
a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
+++ 
b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala
@@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V](
   def consumer(): Consumer[K, V] = this.synchronized {
     if (null == kc) {
       kc = consumerStrategy.onStart(
-        currentOffsets.view.mapValues(l => 
java.lang.Long.valueOf(l)).toMap.asJava)
+        currentOffsets.transform((_, l) => java.lang.Long.valueOf(l)).asJava)
     }
     kc
   }
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
index 33e6ccc7443..74c5e070b7a 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala
@@ -726,8 +726,8 @@ class DirectKafkaStreamSuite
   /** Get the generated offset ranges from the DirectKafkaStream */
   private def getOffsetRanges[K, V](
       kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, 
Array[OffsetRange])] = {
-    kafkaStream.generatedRDDs.view.mapValues { rdd =>
-      rdd.asInstanceOf[HasOffsetRanges].offsetRanges
+    kafkaStream.generatedRDDs.map { case (t, rdd) =>
+      (t, rdd.asInstanceOf[HasOffsetRanges].offsetRanges)
     }.toSeq.sortBy { _._1 }
   }
 
diff --git 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
index 66c253172e7..46a6fbcf2c3 100644
--- 
a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
+++ 
b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala
@@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging {
 
   /** Java-friendly function for sending messages to the Kafka broker */
   def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = {
-    sendMessages(topic, 
Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*))
+    sendMessages(topic, messageToFreq.asScala.toMap.transform((_, v) => 
v.intValue()))
   }
 
   /** Send the messages to the Kafka broker */
diff --git 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
index 10d24df7b61..d0eee9c83c2 100644
--- 
a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala
@@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator(
       sentSeqNumbers += ((num, seqNumber))
     }
 
-    shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
+    shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
   }
 }
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
index 2799965a8fc..58b64ba11d3 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala
@@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) 
extends KinesisDataG
       Futures.addCallback(future, kinesisCallBack, 
ThreadUtils.sameThreadExecutorService())
     }
     producer.flushSync()
-    shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap
+    shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq)
   }
 }
diff --git 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
index 7c6cf7c3a22..2088a9deafe 100644
--- 
a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
+++ 
b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala
@@ -48,8 +48,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: 
Boolean)
       require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to 
multiple shards")
 
       shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq
-      shardIdToData = 
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap
-      shardIdToSeqNumbers = 
shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap
+      shardIdToData = shardIdToDataAndSeqNumbers.transform((_, v) => 
v.map(_._1))
+      shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.transform((_, v) => 
v.map(_._2))
       shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) =>
         val seqNumRange = SequenceNumberRange(
           testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, 
seqNumbers.size)
diff --git 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
index 6c18a8863af..fee1bcdc967 100644
--- 
a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
+++ 
b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala
@@ -75,8 +75,7 @@ private[sql] object ProtobufUtils extends Logging {
     private[this] val protoFieldArray = descriptor.getFields.asScala.toArray
     private[this] val fieldMap = descriptor.getFields.asScala
       .groupBy(_.getName.toLowerCase(Locale.ROOT))
-      .view
-      .mapValues(_.toSeq) // toSeq needed for scala 2.13
+      .transform((_, v) => v.toSeq) // toSeq needed for scala 2.13
 
     /** The fields which have matching equivalents in both Protobuf and 
Catalyst schemas. */
     val matchedFields: Seq[ProtoMatchedField] = 
catalystSchema.zipWithIndex.flatMap {
diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
index aa8e1b1520e..f16c0be75c6 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala
@@ -147,7 +147,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKey(
       withReplacement,
-      fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala 
Double; toMap to serialize
+      fractions.asScala.toMap
+        .transform((_, v) => v.toDouble), // map to Scala Double; toMap to 
serialize
       seed))
 
   /**
@@ -179,7 +180,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
       seed: Long): JavaPairRDD[K, V] =
     new JavaPairRDD[K, V](rdd.sampleByKeyExact(
       withReplacement,
-      fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala 
Double; toMap to serialize
+      fractions.asScala.toMap
+        .transform((_, v) => v.toDouble), // map to Scala Double; toMap to 
serialize
       seed))
 
   /**
diff --git 
a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala 
b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
index 1c6c44e773d..08c1134e30d 100644
--- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
+++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala
@@ -784,7 +784,7 @@ class JavaSparkContext(val sc: SparkContext) extends 
Closeable {
    * @note This does not necessarily mean the caching or computation was 
successful.
    */
   def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = {
-    sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap
+    sc.getPersistentRDDs.toMap.transform((_, s) => JavaRDD.fromRDD(s))
       .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]]
   }
 
diff --git 
a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
index f6dbeadd96f..e385bc685bf 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala
@@ -378,7 +378,7 @@ private[spark] class PythonWorkerFactory(
         daemon = null
         daemonPort = 0
       } else {
-        simpleWorkers.view.mapValues(_.destroy())
+        simpleWorkers.values.foreach(_.destroy())
       }
     }
   }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala 
b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
index e04c5b2de7d..7f56dc251db 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala
@@ -178,6 +178,6 @@ object InputFormatInfo {
       }
     }
 
-    nodeToSplit.view.mapValues(_.toSet).toMap
+    nodeToSplit.toMap.transform((_, v) => v.toSet)
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala 
b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
index 15ae2fef221..454e7ed3ce6 100644
--- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
+++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala
@@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl(
       .build[String, ExecutorDecommissionState]()
 
   def runningTasksByExecutors: Map[String, Int] = synchronized {
-    executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap
+    executorIdToRunningTaskIds.toMap.transform((_, v) => v.size)
   }
 
   // The set of executors we have on each host; this is used to compute 
hostsAlive, which
diff --git 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
index e8519b4bb4d..4f1503f1d8c 100644
--- 
a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
+++ 
b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
@@ -739,7 +739,7 @@ class CoarseGrainedSchedulerBackend(scheduler: 
TaskSchedulerImpl, val rpcEnv: Rp
   }
 
   def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized {
-    executorDataMap.view.mapValues(v => v.registrationTs).toMap
+    executorDataMap.toMap.transform((_, v) => v.registrationTs)
   }
 
   override def isExecutorActive(id: String): Boolean = synchronized {
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
index 4ee864f00e8..0f192a9d67b 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala
@@ -129,11 +129,11 @@ private[protobuf] class 
ApplicationEnvironmentInfoWrapperSerializer
 
     new ResourceProfileInfo(
       id = info.getId,
-      executorResources =
-        
info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest)
-          .toMap,
-      taskResources =
-        
info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap)
+      executorResources = info.getExecutorResourcesMap.asScala.toMap
+        .transform((_, v) => deserializeExecutorResourceRequest(v)),
+      taskResources = info.getTaskResourcesMap.asScala.toMap
+        .transform((_, v) => deserializeTaskResourceRequest(v))
+    )
   }
 
   private def deserializeExecutorResourceRequest(info: 
StoreTypes.ExecutorResourceRequest):
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
index 188187c9b75..3927764eaa3 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala
@@ -138,7 +138,7 @@ private[protobuf] class ExecutorSummaryWrapperSerializer
       peakMemoryMetrics = peakMemoryMetrics,
       attributes = binary.getAttributesMap.asScala.toMap,
       resources =
-        
binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap,
+        binary.getResourcesMap.asScala.toMap.transform((_, v) => 
deserializeResourceInformation(v)),
       resourceProfileId = binary.getResourceProfileId,
       isExcluded = binary.getIsExcluded,
       excludedInStages = 
binary.getExcludedInStagesList.asScala.map(_.toInt).toSet)
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
index 5252b8b8c01..69a97ba6c46 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala
@@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends 
ProtobufSerDe[JobDataWr
       numCompletedStages = info.getNumCompletedStages,
       numSkippedStages = info.getNumSkippedStages,
       numFailedStages = info.getNumFailedStages,
-      killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap)
+      killedTasksSummary = 
info.getKillTasksSummaryMap.asScala.toMap.transform((_, v) => v.toInt))
   }
 }
diff --git 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
index 4fbaff0327d..d83cff5f23a 100644
--- 
a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
+++ 
b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala
@@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
     new StageDataWrapper(
       info = info,
       jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet,
-      locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap
+      locality = binary.getLocalityMap.asScala.toMap.transform((_, v) => 
v.toLong)
     )
   }
 
@@ -402,8 +402,8 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
         entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap)
     } else None
     val executorSummary = if 
(MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) {
-      Some(binary.getExecutorSummaryMap.asScala.view.mapValues(
-        ExecutorStageSummarySerializer.deserialize).toMap)
+      Some(binary.getExecutorSummaryMap.asScala.toMap
+        .transform((_, v) => ExecutorStageSummarySerializer.deserialize(v)))
     } else None
     val speculationSummary =
       getOptional(binary.hasSpeculationSummary,
@@ -475,7 +475,8 @@ private[protobuf] class StageDataWrapperSerializer extends 
ProtobufSerDe[StageDa
       tasks = tasks,
       executorSummary = executorSummary,
       speculationSummary = speculationSummary,
-      killedTasksSummary = 
binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap,
+      killedTasksSummary =
+        binary.getKilledTasksSummaryMap.asScala.toMap.transform((_, v) => 
v.toInt),
       resourceProfileId = binary.getResourceProfileId,
       peakExecutorMetrics = peakExecutorMetrics,
       taskMetricsDistributions = taskMetricsDistributions,
diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala 
b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
index 0206205c353..c012613c2ee 100644
--- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
+++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala
@@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite {
   }
 
   def checkIfUnique(ss: Seq[Any]): Unit = {
-    val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 > 
1).keys.toSeq
+    val dups = ss.groupBy(identity).transform((_, v) => v.size).filter(_._2 > 
1).keys.toSeq
     assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}")
   }
 
diff --git 
a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
index e436d988434..c2ada90e7f6 100644
--- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala
@@ -806,8 +806,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with 
SharedSparkContext {
         seed: Long,
         n: Long): Unit = {
       val trials = stratifiedData.countByKey()
-      val expectedSampleSize = 
stratifiedData.countByKey().view.mapValues(count =>
-        math.ceil(count * samplingRate).toInt)
+      val expectedSampleSize = stratifiedData.countByKey().map { case (k, 
count) =>
+        (k, math.ceil(count * samplingRate).toInt)
+      }
       val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
       val sample = if (exact) {
         stratifiedData.sampleByKeyExact(true, fractions, seed)
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 706ebfa9364..55f6b5043fb 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -1255,8 +1255,7 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext with Eventually {
       .getPartitions
       .map(coalescedRDD.getPreferredLocations(_).head)
       .groupBy(identity)
-      .view
-      .mapValues(_.length)
+      .transform((_, v) => v.length)
 
     // Make sure the coalesced partitions are distributed fairly evenly 
between the two locations.
     // This should not become flaky since the DefaultPartitionsCoalescer uses 
a fixed seed.
diff --git 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
index 8676efe3140..203e30cac1a 100644
--- 
a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala
@@ -102,8 +102,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite {
       // assert that each address was assigned `slots` times
       info.assignedAddrs
         .groupBy(identity)
-        .view
-        .mapValues(_.size)
+        .transform((_, v) => v.size)
         .foreach(x => assert(x._2 == slots))
 
       addresses.foreach { addr =>
diff --git 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
index 46099254545..ba665600a1c 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala
@@ -300,7 +300,7 @@ class BlockManagerDecommissionIntegrationSuite extends 
SparkFunSuite with LocalS
         val blockLocs = rddUpdates.map { update =>
           (update.blockUpdatedInfo.blockId.name,
             update.blockUpdatedInfo.blockManagerId)}
-        val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size)
+        val blocksToManagers = blockLocs.groupBy(_._1).transform((_, v) => 
v.size)
         assert(blocksToManagers.exists(_._2 > 1),
           s"We should have a block that has been on multiple BMs in rdds:\n 
${rddUpdates} from:\n" +
           s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}")
diff --git 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
index 75a450f4358..02900e14b1f 100644
--- 
a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala
@@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends 
SparkFunSuite with PrivateMethodT
       ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer()
     )
 
-    configureMockTransfer(blocks.view.mapValues(_ => 
createMockManagedBuffer(0)).toMap)
+    configureMockTransfer(blocks.transform((_, _) => 
createMockManagedBuffer(0)))
 
     val iterator = createShuffleBlockIteratorWithDefaults(
       Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0))
diff --git 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
index 02cc2bb35af..5dc08396220 100644
--- 
a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
+++ 
b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala
@@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with 
LocalSparkContext {
     val expected = (0 until 3).map { p =>
       var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k 
% 3 == p }.toSet
       if (withPartialAgg) {
-        v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet
+        v = v.groupBy(_._1).transform((_, s) => s.map(_._2).sum).toSet
       }
       (p, v.toSet)
     }.toSet
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
index 4cad0f16426..03178342b89 100644
--- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala
@@ -89,8 +89,7 @@ object DFSReadWriteTest {
       .flatMap(_.split("\t"))
       .filter(_.nonEmpty)
       .groupBy(w => w)
-      .view
-      .mapValues(_.size)
+      .transform((_, v) => v.size)
       .values
       .sum
   }
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala 
b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
index 9095c9b75af..6e9088a1628 100644
--- a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
+++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala
@@ -81,8 +81,7 @@ object MiniReadWriteTest {
       .flatMap(_.split("\t"))
       .filter(_.nonEmpty)
       .groupBy(w => w)
-      .view
-      .mapValues(_.size)
+      .transform((_, v) => v.size)
       .values
       .sum
   }
diff --git 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
index d26c9a3a056..5b87cb1559a 100644
--- 
a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
+++ 
b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala
@@ -101,7 +101,7 @@ object PowerIterationClusteringExample {
       .setInitializationMode("degree")
       .run(circlesRdd)
 
-    val clusters = 
model.assignments.collect().groupBy(_.cluster).view.mapValues(_.map(_.id))
+    val clusters = 
model.assignments.collect().groupBy(_.cluster).transform((_, v) => v.map(_.id))
     val assignments = clusters.toList.sortBy { case (k, v) => v.length }
     val assignmentsStr = assignments
       .map { case (k, v) =>
diff --git 
a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala 
b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
index 1f158808215..03e2ac7eeb6 100644
--- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
+++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala
@@ -34,7 +34,7 @@ class ShortestPathsSuite extends SparkFunSuite with 
LocalSparkContext {
       val graph = Graph.fromEdgeTuples(edges, 1)
       val landmarks = Seq(1, 4).map(_.toLong)
       val results = ShortestPaths.run(graph, landmarks).vertices.collect().map 
{
-        case (v, spMap) => (v, spMap.view.mapValues(i => i).toMap)
+        case (v, spMap) => (v, spMap.toMap.transform((_, i) => i))
       }
       assert(results.toSet === shortestPaths)
     }
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
index a433f5a6e56..98fbe471f29 100644
--- 
a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala
@@ -332,12 +332,11 @@ private[evaluation] object SquaredEuclideanSilhouette 
extends Silhouette {
 
     clustersStatsRDD
       .collectAsMap()
-      .view
-      .mapValues {
-        case (featureSum: DenseVector, squaredNormSum: Double, weightSum: 
Double) =>
+      .toMap
+      .transform {
+        case (_, (featureSum: DenseVector, squaredNormSum: Double, weightSum: 
Double)) =>
           SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, 
weightSum)
       }
-      .toMap
   }
 
   /**
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
index ff997194b42..8eb8f81227c 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala
@@ -300,7 +300,8 @@ class VectorIndexerModel private[ml] (
   /** Java-friendly version of [[categoryMaps]] */
   @Since("1.4.0")
   def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = {
-    categoryMaps.view.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, 
JMap[JDouble, JInt]]]
+    categoryMaps.map { case (k, v) => (k, v.asJava) }
+      .asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]]
   }
 
   /**
diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala 
b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
index 638d8463b9d..66b56f8b88e 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala
@@ -218,7 +218,7 @@ class Word2VecModel private[ml] (
   @Since("1.5.0")
   @transient lazy val getVectors: DataFrame = {
     val spark = SparkSession.builder().getOrCreate()
-    val wordVec = wordVectors.getVectors.view.mapValues(vec => 
Vectors.dense(vec.map(_.toDouble)))
+    val wordVec = wordVectors.getVectors.transform((_, vec) => 
Vectors.dense(vec.map(_.toDouble)))
     spark.createDataFrame(wordVec.toSeq).toDF("word", "vector")
   }
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala 
b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
index 4e5b8cd5efe..440b6635a52 100644
--- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
+++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala
@@ -1292,8 +1292,8 @@ private[spark] object RandomForest extends Logging with 
Serializable {
     }
     // Convert mutable maps to immutable ones.
     val nodesForGroup: Map[Int, Array[LearningNode]] =
-      mutableNodesForGroup.view.mapValues(_.toArray).toMap
-    val treeToNodeToIndexInfo = 
mutableTreeToNodeToIndexInfo.view.mapValues(_.toMap).toMap
+      mutableNodesForGroup.toMap.transform((_, v) => v.toArray)
+    val treeToNodeToIndexInfo = 
mutableTreeToNodeToIndexInfo.toMap.transform((_, v) => v.toMap)
     (nodesForGroup, treeToNodeToIndexInfo)
   }
 
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
index fce2537b761..6e5c026cd01 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala
@@ -220,7 +220,7 @@ class BisectingKMeans private (
             divisibleIndices.contains(parentIndex(index))
           }
           newClusters = summarize(d, newAssignments, dMeasure)
-          newClusterCenters = 
newClusters.view.mapValues(_.center).map(identity).toMap
+          newClusterCenters = newClusters.transform((_, v) => 
v.center).map(identity)
         }
         if (preIndices != null) {
           preIndices.unpersist()
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
index af0d9e48a3b..4e9952e6d76 100644
--- 
a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
+++ 
b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala
@@ -443,7 +443,7 @@ class BlockMatrix @Since("1.3.0") (
     val leftMatrix = blockInfo.keys.collect()
     val rightMatrix = other.blockInfo.keys.collect()
 
-    val rightCounterpartsHelper = 
rightMatrix.groupBy(_._1).view.mapValues(_.map(_._2))
+    val rightCounterpartsHelper = rightMatrix.groupBy(_._1).transform((_, v) 
=> v.map(_._2))
     val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) =>
       val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex, 
Array.emptyIntArray)
       val partitions = rightCounterparts.map(b => 
partitioner.getPartition((rowIndex, b)))
@@ -452,7 +452,7 @@ class BlockMatrix @Since("1.3.0") (
         partitions.toSet.map((pid: Int) => pid * midDimSplitNum + 
midDimSplitIndex))
     }.toMap
 
-    val leftCounterpartsHelper = 
leftMatrix.groupBy(_._2).view.mapValues(_.map(_._1))
+    val leftCounterpartsHelper = leftMatrix.groupBy(_._2).transform((_, v) => 
v.map(_._1))
     val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) =>
       val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex, 
Array.emptyIntArray)
       val partitions = leftCounterparts.map(b => partitioner.getPartition((b, 
colIndex)))
diff --git 
a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala 
b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
index d42df3e2f0d..9aeab65e25d 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala
@@ -162,8 +162,7 @@ private[spark] object ChiSqTest extends Logging {
           .map { case ((label, _), c) => (label, c) }
           .toArray
           .groupBy(_._1)
-          .view
-          .mapValues(_.map(_._2).sum)
+          .transform((_, v) => v.map(_._2).sum)
         labelCounts.foreach { case (label, countByLabel) =>
           val nnzByLabel = labelNNZ.getOrElse(label, 0L)
           val nzByLabel = countByLabel - nnzByLabel
diff --git 
a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala 
b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
index dafdd06a3a2..ff85831a7a6 100644
--- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala
@@ -856,7 +856,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 4, 6).foreach { k =>
       val n = math.min(k, numItems).toInt
-      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.transform((_, v) => v.slice(0, n))
       val topItems = model.recommendForAllUsers(k)
       assert(topItems.count() == numUsers)
       assert(topItems.columns.contains("user"))
@@ -877,7 +877,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 3, 4).foreach { k =>
       val n = math.min(k, numUsers).toInt
-      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.transform((_, v) => v.slice(0, n))
       val topUsers = getALSModel.recommendForAllItems(k)
       assert(topUsers.count() == numItems)
       assert(topUsers.columns.contains("item"))
@@ -899,7 +899,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 4, 6).foreach { k =>
       val n = math.min(k, numItems).toInt
-      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.transform((_, v) => v.slice(0, n))
       val topItems = model.recommendForUserSubset(userSubset, k)
       assert(topItems.count() == numUsersSubset)
       assert(topItems.columns.contains("user"))
@@ -921,7 +921,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest 
with Logging {
 
     Seq(2, 3, 4).foreach { k =>
       val n = math.min(k, numUsers).toInt
-      val expectedUpToN = expected.view.mapValues(_.slice(0, n))
+      val expectedUpToN = expected.transform((_, v) => v.slice(0, n))
       val topUsers = model.recommendForItemSubset(itemSubset, k)
       assert(topUsers.count() == numItemsSubset)
       assert(topUsers.columns.contains("item"))
diff --git 
a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala 
b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
index 1973f306441..0b81a7c8aaf 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala
@@ -43,8 +43,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     // and a Word2VecMap give the same values.
     val word2VecMap = model.getVectors
     val newModel = new Word2VecModel(word2VecMap)
-    assert(newModel.getVectors.view.mapValues(_.toSeq).toMap ===
-      word2VecMap.view.mapValues(_.toSeq).toMap)
+    assert(newModel.getVectors.transform((_, v) => v.toSeq) ===
+      word2VecMap.transform((_, v) => v.toSeq))
   }
 
   test("Word2Vec throws exception when vocabulary is empty") {
@@ -103,8 +103,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     try {
       model.save(sc, path)
       val sameModel = Word2VecModel.load(sc, path)
-      assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
-        model.getVectors.view.mapValues(_.toSeq).toMap)
+      assert(sameModel.getVectors.transform((_, v) => v.toSeq) ===
+        model.getVectors.transform((_, v) => v.toSeq))
     } finally {
       Utils.deleteRecursively(tempDir)
     }
@@ -138,8 +138,8 @@ class Word2VecSuite extends SparkFunSuite with 
MLlibTestSparkContext {
     try {
       model.save(sc, path)
       val sameModel = Word2VecModel.load(sc, path)
-      assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap ===
-        model.getVectors.view.mapValues(_.toSeq).toMap)
+      assert(sameModel.getVectors.transform((_, v) => v.toSeq) ===
+        model.getVectors.transform((_, v) => v.toSeq))
     }
     catch {
       case t: Throwable => fail("exception thrown persisting a model " +
diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala 
b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
index a04b1ec6650..dcb80221b0e 100644
--- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
+++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala
@@ -211,7 +211,7 @@ object Metadata {
       // `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in 
Scala 2.13, call
       // `toMap` for Scala version compatibility.
       case map: Map[_, _] =>
-        map.view.mapValues(hash).toMap.##
+        map.transform((_, v) => hash(v)).##
       case arr: Array[_] =>
         // Seq.empty[T] has the same hashCode regardless of T.
         arr.toImmutableArraySeq.map(hash).##
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 7e687d93091..6ddc3cbbd80 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -1321,7 +1321,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         val partCols = partitionColumnNames(r.table)
         validatePartitionSpec(partCols, i.partitionSpec)
 
-        val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).view.mapValues(_.get).toMap
+        val staticPartitions = 
i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
         val query = addStaticPartitionColumns(r, 
projectByName.getOrElse(i.query), staticPartitions,
           isByName)
 
@@ -3636,8 +3636,7 @@ class Analyzer(override val catalogManager: 
CatalogManager) extends RuleExecutor
         // `GetStructField` without the name property.
         .collect { case g: GetStructField if g.name.isEmpty => g }
         .groupBy(_.child)
-        .view
-        .mapValues(_.map(_.ordinal).distinct.sorted)
+        .transform((_, v) => v.map(_.ordinal).distinct.sorted)
 
       structChildToOrdinals.foreach { case (expr, ordinals) =>
         val schema = expr.dataType.asInstanceOf[StructType]
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
index eff29a78dad..9342d29245a 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala
@@ -539,7 +539,7 @@ case class UnresolvedStarExcept(target: 
Option[Seq[String]], excepts: Seq[Seq[St
       : Seq[NamedExpression] = {
       // group the except pairs by the column they refer to. NOTE: no groupMap 
until scala 2.13
       val groupedExcepts: AttributeMap[Seq[Seq[String]]] =
-        AttributeMap(excepts.groupBy(_._1.toAttribute).view.mapValues(v => 
v.map(_._2)))
+        AttributeMap(excepts.groupBy(_._1.toAttribute).transform((_, v) => 
v.map(_._2)))
 
       // map input columns while searching for the except entry corresponding 
to the current column
       columns.map(col => col -> groupedExcepts.get(col.toAttribute)).collect {
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
index 0f235e0977b..b20c6243898 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala
@@ -207,7 +207,7 @@ object ExternalCatalogUtils {
   }
 
   def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec 
= {
-    spec.view.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else 
v).map(identity).toMap
+    spec.transform((_, v) => if (v == null) DEFAULT_PARTITION_NAME else 
v).map(identity)
   }
 }
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
index b8d52311d9b..f1373b2e593 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -969,7 +969,7 @@ class SessionCatalog(
       } else {
         _.toLowerCase(Locale.ROOT)
       }
-      val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).view.mapValues(_.length)
+      val nameToCounts = 
viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length)
       val nameToCurrentOrdinal = 
scala.collection.mutable.HashMap.empty[String, Int]
       val viewDDL = buildViewDDL(metadata, isTempView)
 
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
index ea985ae5f30..36fde4da262 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala
@@ -126,7 +126,7 @@ package object expressions  {
     }
 
     private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = 
{
-      m.view.mapValues(_.distinct).toMap
+      m.transform((_, v) => v.distinct)
     }
 
     /** Map to use for direct case insensitive attribute lookups. */
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
index 710c07fab7e..7ed68218f14 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala
@@ -150,7 +150,7 @@ object NestedColumnAliasing {
 
     // A reference attribute can have multiple aliases for nested fields.
     val attrToAliases =
-      
AttributeMap(attributeToExtractValuesAndAliases.view.mapValues(_.map(_._2)))
+      AttributeMap(attributeToExtractValuesAndAliases.transform((_, v) => 
v.map(_._2)))
 
     plan match {
       case Project(projectList, child) =>
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
index decef766ae9..a461bf529eb 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala
@@ -1058,8 +1058,7 @@ object CollapseProject extends Rule[LogicalPlan] with 
AliasHelper {
       .filter(_.references.exists(producerMap.contains))
       .flatMap(collectReferences)
       .groupBy(identity)
-      .view
-      .mapValues(_.size)
+      .transform((_, v) => v.size)
       .forall {
         case (reference, count) =>
           val producer = producerMap.getOrElse(reference, reference)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
index 57fe6ae346f..49bde72c48d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala
@@ -1084,8 +1084,8 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
     // window w1 as (partition by p_mfgr order by p_name
     //               range between 2 preceding and 2 following),
     //        w2 as w1
-    val windowMapView = baseWindowMap.view.mapValues {
-      case WindowSpecReference(name) =>
+    val windowMapView = baseWindowMap.transform {
+      case (_, WindowSpecReference(name)) =>
         baseWindowMap.get(name) match {
           case Some(spec: WindowSpecDefinition) =>
             spec
@@ -1094,12 +1094,12 @@ class AstBuilder extends DataTypeAstBuilder with 
SQLConfHelper with Logging {
           case None =>
             throw QueryParsingErrors.cannotResolveWindowReferenceError(name, 
ctx)
         }
-      case spec: WindowSpecDefinition => spec
+      case (_, spec: WindowSpecDefinition) => spec
     }
 
     // Note that mapValues creates a view instead of materialized map. We 
force materialization by
     // mapping over identity.
-    WithWindowDefinition(windowMapView.map(identity).toMap, query)
+    WithWindowDefinition(windowMapView.map(identity), query)
   }
 
   /**
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
index cb11ec4b5f1..917a59d826d 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala
@@ -363,8 +363,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       case s: Seq[_] =>
         s.map(mapChild)
       case m: Map[_, _] =>
-        // `mapValues` is lazy and we need to force it to materialize by 
converting to Map
-        m.view.mapValues(mapChild).toMap
+        m.toMap.transform((_, v) => mapChild(v))
       case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg)
       case Some(child) => Some(mapChild(child))
       case nonChild: AnyRef => nonChild
@@ -784,11 +783,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
       case Some(arg: TreeNode[_]) if containsChild(arg) =>
         Some(arg.asInstanceOf[BaseType].clone())
       // `mapValues` is lazy and we need to force it to materialize by 
converting to Map
-      case m: Map[_, _] => m.view.mapValues {
-        case arg: TreeNode[_] if containsChild(arg) =>
+      case m: Map[_, _] => m.toMap.transform {
+        case (_, arg: TreeNode[_]) if containsChild(arg) =>
           arg.asInstanceOf[BaseType].clone()
-        case other => other
-      }.toMap
+        case (_, other) => other
+      }
       case d: DataType => d // Avoid unpacking Structs
       case args: LazyList[_] => args.map(mapChild).force // Force 
materialization on stream
       case args: Iterable[_] => args.map(mapChild)
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
index 1274751ac94..b66658467c1 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala
@@ -334,7 +334,7 @@ class ToNumberParser(numberFormat: String, errorOnFail: 
Boolean) extends Seriali
       )
     }
     // Make sure that the format string does not contain any prohibited 
duplicate tokens.
-    val inputTokenCounts = 
formatTokens.groupBy(identity).view.mapValues(_.size)
+    val inputTokenCounts = formatTokens.groupBy(identity).transform((_, v) => 
v.size)
     Seq(DecimalPoint(),
       OptionalPlusOrMinusSign(),
       OptionalMinusSign(),
diff --git 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
index 603de520b18..0c0b940a588 100644
--- 
a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
+++ 
b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala
@@ -229,9 +229,8 @@ private[sql] object QueryCompilationErrors extends 
QueryErrorsBase with Compilat
       case seq => Some(CreateStruct(seq)).map(e => Alias(e, e.sql)()).get
     }
       .groupBy(_.dataType)
-      .view
-      .mapValues(values => values.map(value => toSQLId(value.name)).sorted)
-      .mapValues(values => if (values.length > 3) values.take(3) :+ "..." else 
values)
+      .transform((_, values) => values.map(value => 
toSQLId(value.name)).sorted)
+      .transform((_, values) => if (values.length > 3) values.take(3) :+ "..." 
else values)
       .toList.sortBy(_._1.sql)
       .map { case (dataType, values) => s"${toSQLType(dataType)} 
(${values.mkString(", ")})" }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala 
b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
index 24497add04f..9bc60f067dd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala
@@ -700,7 +700,7 @@ class SparkSession private(
       val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) {
         val parsedPlan = sessionState.sqlParser.parsePlan(sqlText)
         if (args.nonEmpty) {
-          NameParameterizedQuery(parsedPlan, 
args.view.mapValues(lit(_).expr).toMap)
+          NameParameterizedQuery(parsedPlan, args.transform((_, v) => 
lit(v).expr))
         } else {
           parsedPlan
         }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
index 7fe4c73abf9..98a851f19f0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -61,7 +61,7 @@ case class AnalyzePartitionCommand(
         tableId.table, tableId.database.get, schemaColumns, specColumns)
     }
 
-    val filteredSpec = 
normalizedPartitionSpec.filter(_._2.isDefined).view.mapValues(_.get)
+    val filteredSpec = 
normalizedPartitionSpec.filter(_._2.isDefined).transform((_, v) => v.get)
     if (filteredSpec.isEmpty) {
       None
     } else {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
index 959a99c95cc..9905e9af9b0 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala
@@ -407,7 +407,7 @@ object PartitioningUtils extends SQLConfHelper {
     val distinctPartColNames = 
pathWithPartitionValues.map(_._2.columnNames).distinct
 
     def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] =
-      seq.groupBy { case (key, _) => key }.view.mapValues(_.map { case (_, 
value) => value }).toMap
+      seq.groupBy { case (key, _) => key }.transform((_, v) => v.map { case 
(_, value) => value })
 
     val partColNamesToPaths = groupByKey(pathWithPartitionValues.map {
       case (path, partValues) => partValues.columnNames -> path
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
index 4a608a47b38..c4e12d5c4ae 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala
@@ -82,8 +82,7 @@ trait OrcFiltersBase {
       val dedupPrimitiveFields = primitiveFields
         .groupBy(_._1.toLowerCase(Locale.ROOT))
         .filter(_._2.size == 1)
-        .view
-        .mapValues(_.head._2)
+        .transform((_, v) => v.head._2)
       CaseInsensitiveMap(dedupPrimitiveFields.toMap)
     }
   }
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
index 02cf34aee3e..5f6890b94f4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala
@@ -111,9 +111,8 @@ class ParquetFilters(
       primitiveFields
         .groupBy(_._1.toLowerCase(Locale.ROOT))
         .filter(_._2.size == 1)
-        .view
-        .mapValues(_.head._2)
-      CaseInsensitiveMap(dedupPrimitiveFields.toMap)
+        .transform((_, v) => v.head._2)
+      CaseInsensitiveMap(dedupPrimitiveFields)
     }
   }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
index bddf6d02f1f..2a7c1206bb4 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala
@@ -491,8 +491,7 @@ case class EnsureRequirements(
               val numExpectedPartitions = partValues
                 .map(InternalRowComparableWrapper(_, partitionExprs))
                 .groupBy(identity)
-                .view
-                .mapValues(_.size)
+                .transform((_, v) => v.size)
 
               mergedPartValues = mergedPartValues.map { case (partVal, 
numParts) =>
                 (partVal, numExpectedPartitions.getOrElse(
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
index ffdf9da6e58..2a9aa82a148 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala
@@ -136,9 +136,9 @@ trait ProgressReporter extends Logging {
       from: StreamProgress,
       to: StreamProgress,
       latest: StreamProgress): Unit = {
-    currentTriggerStartOffsets = from.view.mapValues(_.json).toMap
-    currentTriggerEndOffsets = to.view.mapValues(_.json).toMap
-    currentTriggerLatestOffsets = latest.view.mapValues(_.json).toMap
+    currentTriggerStartOffsets = from.transform((_, v) => v.json)
+    currentTriggerEndOffsets = to.transform((_, v) => v.json)
+    currentTriggerLatestOffsets = latest.transform((_, v) => v.json)
     latestStreamProgress = to
   }
 
@@ -243,7 +243,7 @@ trait ProgressReporter extends Logging {
       batchId = currentBatchId,
       batchDuration = processingTimeMills,
       durationMs =
-        new 
java.util.HashMap(currentDurationsMs.toMap.view.mapValues(long2Long).toMap.asJava),
+        new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => 
long2Long(v)).asJava),
       eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava),
       stateOperators = executionStats.stateOperators.toArray,
       sources = sourceProgress.toArray,
@@ -297,17 +297,17 @@ trait ProgressReporter extends Logging {
         Map(
           "max" -> stats.max,
           "min" -> stats.min,
-          "avg" -> stats.avg.toLong).view.mapValues(formatTimestamp)
+          "avg" -> stats.avg.toLong).transform((_, v) => formatTimestamp(v))
     }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp
 
-    ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap)
+    ExecutionStats(numInputRows, stateOperators, eventTimeStats)
   }
 
   /** Extract number of input sources for each streaming source in plan */
   private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = {
 
     def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, 
Long] = {
-      tuples.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // sum up 
rows for each source
+      tuples.groupBy(_._1).transform((_, v) => v.map(_._2).sum) // sum up rows 
for each source
     }
 
     def unrollCTE(plan: LogicalPlan): LogicalPlan = {
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
index feb745b0402..3a42f9e2ccb 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala
@@ -507,7 +507,7 @@ class RocksDB(
       "put" -> DB_WRITE,
       "compaction" -> COMPACTION_TIME
     ).toMap
-    val nativeOpsLatencyMicros = nativeOpsHistograms.view.mapValues { typ =>
+    val nativeOpsLatencyMicros = nativeOpsHistograms.transform { (_, typ) =>
       RocksDBNativeHistogram(nativeStats.getHistogramData(typ))
     }
     val nativeOpsMetricTickers = Seq(
@@ -530,7 +530,7 @@ class RocksDB(
       /** Number of bytes written during flush */
       "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES
     ).toMap
-    val nativeOpsMetrics = nativeOpsMetricTickers.view.mapValues { typ =>
+    val nativeOpsMetrics = nativeOpsMetricTickers.transform { (_, typ) =>
       nativeStats.getTickerCount(typ)
     }
 
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
index b36aa264e28..57db193a4c8 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala
@@ -156,7 +156,7 @@ trait StateStoreWriter extends StatefulOperator with 
PythonSQLMetrics { self: Sp
       .map(entry => entry._1 -> longMetric(entry._1).value)
 
     val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] =
-      new 
java.util.HashMap(customMetrics.view.mapValues(long2Long).toMap.asJava)
+      new java.util.HashMap(customMetrics.transform((_, v) => 
long2Long(v)).asJava)
 
     // We now don't report number of shuffle partitions inside the state 
operator. Instead,
     // it will be filled when the stream query progress is reported
diff --git 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
index f66b99a154e..c9090cacdb3 100644
--- 
a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
+++ 
b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala
@@ -79,7 +79,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
             request,
             "running",
             running.toSeq,
-            executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
+            executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq),
             currentTime,
             showErrorMessage = false,
             showRunningJobs = true,
@@ -105,7 +105,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
           request,
           "completed",
           completed.toSeq,
-          executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
+          executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq),
           currentTime,
           showErrorMessage = false,
           showRunningJobs = false,
@@ -132,7 +132,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends 
WebUIPage("") with L
             request,
             "failed",
             failed.toSeq,
-            executionIdToSubExecutions.view.mapValues(_.toSeq).toMap,
+            executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq),
             currentTime,
             showErrorMessage = true,
             showRunningJobs = false,
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
index 20ac2a9e946..8b53edb0d3c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala
@@ -347,7 +347,7 @@ class DataFrameStatSuite extends QueryTest with 
SharedSparkSession {
       val columnNames = crosstab.schema.fieldNames
       assert(columnNames(0) === "a_b")
       // reduce by key
-      val expected = data.map(t => (t, 
1)).groupBy(_._1).view.mapValues(_.length)
+      val expected = data.map(t => (t, 1)).groupBy(_._1).transform((_, v) => 
v.length)
       val rows = crosstab.collect()
       rows.foreach { row =>
         val i = row.getString(0).toInt
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index d9f4c685163..03296476679 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -426,7 +426,7 @@ class SQLQueryTestSuite extends QueryTest with 
SharedSparkSession with SQLHelper
       // We need to do cartesian product for all the config dimensions, to get 
a list of
       // config sets, and run the query once for each config set.
       val configDimLines = 
comments.filter(_.startsWith("--CONFIG_DIM")).map(_.substring(12))
-      val configDims = configDimLines.groupBy(_.takeWhile(_ != ' 
')).view.mapValues { lines =>
+      val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).transform 
{ (_, lines) =>
         lines.map(_.dropWhile(_ != ' ').substring(1)).map(_.split(",").map { 
kv =>
           val (conf, value) = kv.span(_ != '=')
           conf.trim -> value.substring(1).trim
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
index c5e65d2e65f..6a597d7ab4f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala
@@ -527,7 +527,7 @@ class UDFSuite extends QueryTest with SharedSparkSession {
       sparkContext.parallelize(Seq(Row(Map("a" -> new 
BigDecimal("2011000000000002456556"))))),
       StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30, 
0))))))
     val udf2 = org.apache.spark.sql.functions.udf((map: Map[String, 
BigDecimal]) => {
-      map.view.mapValues(value => if (value == null) null else 
value.toBigInteger.toString).toMap
+      map.transform((_, value) => if (value == null) null else 
value.toBigInteger.toString)
     })
     checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" -> 
"2011000000000002456556"))))
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
index b7907b99495..d2740f9eac7 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala
@@ -630,7 +630,7 @@ class PersistedViewTestSuite extends SQLViewTestSuite with 
SharedSparkSession {
         val meta = catalog.getTableRawMetadata(TableIdentifier("test_view", 
Some("default")))
         // simulate a view meta with incompatible schema change
         val newProp = meta.properties
-          .view.mapValues(_.replace("col_i", "col_j")).toMap
+          .transform((_, v) => v.replace("col_i", "col_j"))
         val newSchema = StructType(Seq(StructField("col_j", IntegerType)))
         catalog.alterTable(meta.copy(properties = newProp, schema = newSchema))
         val e = intercept[AnalysisException] {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
index 199d1b8b4b6..02c9d318bb4 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala
@@ -47,7 +47,8 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with 
DDLCommandTestUtil
       t: String,
       ifExists: String,
       specs: Map[String, Any]*): Unit = {
-    checkPartitions(t, specs.map(_.view.mapValues(_.toString).toMap): _*)
+    checkPartitions(t,
+      specs.map(_.map { case (k, v) => (k, v.toString) }.toMap): _*)
     val specStr = specs.map(partSpecToString).mkString(", ")
     sql(s"ALTER TABLE $t DROP $ifExists $specStr")
     checkPartitions(t)
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
index 1efa8221e41..374a8a8078e 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala
@@ -392,7 +392,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest 
with SharedSparkSession
     withParquetDataFrame(data) { df =>
       // Structs are converted to `Row`s
       checkAnswer(df, data.map { case Tuple1(m) =>
-        Row(m.view.mapValues(struct => Row(struct.productIterator.toSeq: _*)))
+        Row(m.transform((_, struct) => Row(struct.productIterator.toSeq: _*)))
       })
     }
   }
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
index 30315c12b58..72aa607591d 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala
@@ -216,13 +216,13 @@ trait SQLMetricsTestUtils extends SQLTestUtils {
       expectedNumOfJobs: Int,
       expectedMetrics: Map[Long, (String, Map[String, Any])],
       enableWholeStage: Boolean = false): Unit = {
-    val expectedMetricsPredicates = expectedMetrics.view.mapValues { case 
(nodeName, nodeMetrics) =>
-      (nodeName, nodeMetrics.view.mapValues(expectedMetricValue =>
+    val expectedMetricsPredicates = expectedMetrics.transform { case (_, 
(nodeName, nodeMetrics)) =>
+      (nodeName, nodeMetrics.transform((_, expectedMetricValue) =>
         (actualMetricValue: Any) => {
           actualMetricValue.toString.matches(expectedMetricValue.toString)
-        }).toMap)
+        }))
     }
-    testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, 
expectedMetricsPredicates.toMap,
+    testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, 
expectedMetricsPredicates,
       enableWholeStage)
   }
 
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
index 5affc9ef3b2..17e77cf8d8f 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala
@@ -220,23 +220,23 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 2).toMap)
+      accumulatorUpdates.transform((_, v) => v * 2))
 
     // Driver accumulator updates don't belong to this execution should be 
filtered and no
     // exception will be thrown.
     listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L))))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 2).toMap)
+      accumulatorUpdates.transform((_, v) => v * 2))
 
     listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", 
Seq(
       // (task id, stage id, stage attempt, accum updates)
       (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)),
-      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.view.mapValues(_ * 
2).toMap))
+      (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.transform((_, v) => 
v * 2)))
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 3).toMap)
+      accumulatorUpdates.transform((_, v) => v * 3))
 
     // Retrying a stage should reset the metrics
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 
1)))
@@ -250,7 +250,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 2).toMap)
+      accumulatorUpdates.transform((_, v) => v * 2))
 
     // Ignore the task end for the first attempt
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -258,12 +258,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
100).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 
100)),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 2).toMap)
+      accumulatorUpdates.transform((_, v) => v * 2))
 
     // Finish two tasks
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -271,7 +271,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 1,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
2).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 
2)),
       new ExecutorMetrics,
       null))
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -279,12 +279,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 1,
       taskType = "",
       reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
+      createTaskInfo(1, 0, accums = accumulatorUpdates.transform((_, v) => v * 
3)),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 5).toMap)
+      accumulatorUpdates.transform((_, v) => v * 5))
 
     // Summit a new stage
     listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 
0)))
@@ -298,7 +298,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     )))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 7).toMap)
+      accumulatorUpdates.transform((_, v) => v * 7))
 
     // Finish two tasks
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -306,7 +306,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
+      createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 
3)),
       new ExecutorMetrics,
       null))
     listener.onTaskEnd(SparkListenerTaskEnd(
@@ -314,12 +314,12 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
       stageAttemptId = 0,
       taskType = "",
       reason = null,
-      createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 
3).toMap),
+      createTaskInfo(1, 0, accums = accumulatorUpdates.transform((_, v) => v * 
3)),
       new ExecutorMetrics,
       null))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 11).toMap)
+      accumulatorUpdates.transform((_, v) => v * 11))
 
     assertJobs(statusStore.execution(executionId), running = Seq(0))
 
@@ -334,7 +334,7 @@ abstract class SQLAppStatusListenerSuite extends 
SharedSparkSession with JsonTes
     assertJobs(statusStore.execution(executionId), completed = Seq(0))
 
     checkAnswer(statusStore.executionMetrics(executionId),
-      accumulatorUpdates.view.mapValues(_ * 11).toMap)
+      accumulatorUpdates.transform((_, v) => v * 11))
   }
 
   test("control a plan explain mode in listeners via SQLConf") {
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
index a0f102109b1..dce27bdc5d1 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala
@@ -436,7 +436,7 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     batchDuration = 0L,
-    durationMs = new java.util.HashMap(Map("total" -> 
0L).view.mapValues(long2Long).toMap.asJava),
+    durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => 
long2Long(v)).asJava),
     eventTime = new java.util.HashMap(Map(
       "max" -> "2016-12-05T20:54:20.827Z",
       "min" -> "2016-12-05T20:54:20.827Z",
@@ -448,7 +448,7 @@ object StreamingQueryStatusAndProgressSuite {
       numShufflePartitions = 2, numStateStoreInstances = 2,
       customMetrics = new 
java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L,
         "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L)
-        .view.mapValues(long2Long).toMap.asJava)
+        .transform((_, v) => long2Long(v)).asJava)
     )),
     sources = Array(
       new SourceProgress(
@@ -474,7 +474,7 @@ object StreamingQueryStatusAndProgressSuite {
     timestamp = "2016-12-05T20:54:20.827Z",
     batchId = 2L,
     batchDuration = 0L,
-    durationMs = new java.util.HashMap(Map("total" -> 
0L).view.mapValues(long2Long).toMap.asJava),
+    durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => 
long2Long(v)).asJava),
     // empty maps should be handled correctly
     eventTime = new java.util.HashMap(Map.empty[String, String].asJava),
     stateOperators = Array(new StateOperatorProgress(operatorName = "op2",
diff --git 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
index e5e873cca12..f1a63ac0f78 100644
--- 
a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
+++ 
b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala
@@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest {
       c.committedOffsets.lastOption.map { case (_, offset) =>
         offset match {
           case o: RateStreamOffset =>
-            o.partitionToValueAndRunTimeMs.view.mapValues(_.value).toMap
+            o.partitionToValueAndRunTimeMs.transform((_, v) => v.value)
         }
       }
     }
diff --git 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index f52ff8cf9a1..32526f1d18f 100644
--- 
a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ 
b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -1261,7 +1261,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, 
hadoopConf: Configurat
       table: String,
       partialSpec: Option[TablePartitionSpec] = None): Seq[String] = 
withClient {
     val catalogTable = getTable(db, table)
-    val partColNameMap = 
buildLowerCasePartColNameMap(catalogTable).view.mapValues(escapePathName)
+    val partColNameMap =
+      buildLowerCasePartColNameMap(catalogTable).transform((_, v) => 
escapePathName(v))
     val clientPartitionNames =
       client.getPartitionNames(catalogTable, 
partialSpec.map(toMetaStorePartitionSpec))
     clientPartitionNames.map { partitionPath =>
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
index 5a124be5679..2f981454de4 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala
@@ -65,7 +65,7 @@ private[streaming] class 
JavaStreamingListenerWrapper(javaStreamingListener: Jav
   private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = {
     JavaBatchInfo(
       batchInfo.batchTime,
-      
batchInfo.streamIdToInputInfo.view.mapValues(toJavaStreamInputInfo).toMap.asJava,
+      batchInfo.streamIdToInputInfo.transform((_, v) => 
toJavaStreamInputInfo(v)).asJava,
       batchInfo.submissionTime,
       batchInfo.processingStartTime.getOrElse(-1),
       batchInfo.processingEndTime.getOrElse(-1),
@@ -73,7 +73,7 @@ private[streaming] class 
JavaStreamingListenerWrapper(javaStreamingListener: Jav
       batchInfo.processingDelay.getOrElse(-1),
       batchInfo.totalDelay.getOrElse(-1),
       batchInfo.numRecords,
-      
batchInfo.outputOperationInfos.view.mapValues(toJavaOutputOperationInfo).toMap.asJava
+      batchInfo.outputOperationInfos.transform((_, v) => 
toJavaOutputOperationInfo(v)).asJava
     )
   }
 
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
index 5c47cde0459..79b1d0c545b 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala
@@ -184,7 +184,7 @@ private[streaming] class ReceiverSchedulingPolicy {
 
     val executorWeights: Map[ExecutorCacheTaskLocation, Double] = {
       
receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights)
-        .groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // Sum weights 
for each executor
+        .groupBy(_._1).transform((_, v) => v.map(_._2).sum) // Sum weights for 
each executor
     }
 
     val idleExecutors = executors.toSet -- executorWeights.keys
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
index d5bc658b4b5..52ea8fdfbfe 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
@@ -245,11 +245,11 @@ class ReceiverTracker(ssc: StreamingContext, 
skipReceiverLaunch: Boolean = false
    */
   def allocatedExecutors(): Map[Int, Option[String]] = synchronized {
     if (isTrackerStarted) {
-      endpoint.askSync[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).view.mapValues {
-        _.runningExecutor.map {
+      endpoint.askSync[Map[Int, 
ReceiverTrackingInfo]](GetAllReceiverInfo).transform { (_, v) =>
+        v.runningExecutor.map {
           _.executorId
         }
-      }.toMap
+      }
     } else {
       Map.empty
     }
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
index 08de8d46c31..02f6dd9fcac 100644
--- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
+++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala
@@ -98,7 +98,8 @@ private[ui] object BatchUIData {
 
   def apply(batchInfo: BatchInfo): BatchUIData = {
     val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]()
-    outputOperations ++= 
batchInfo.outputOperationInfos.view.mapValues(OutputOperationUIData.apply)
+    outputOperations ++= batchInfo.outputOperationInfos
+      .transform((_, v) => OutputOperationUIData(v))
     new BatchUIData(
       batchInfo.batchTime,
       batchInfo.streamIdToInputInfo,
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
index 140eb866f6f..bbe99f64707 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala
@@ -217,7 +217,7 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
     val _retainedBatches = retainedBatches
     val latestBatches = _retainedBatches.map { batchUIData =>
       (batchUIData.batchTime.milliseconds,
-        batchUIData.streamIdToInputInfo.view.mapValues(_.numRecords))
+        batchUIData.streamIdToInputInfo.transform((_, v) => v.numRecords))
     }
     streamIds.map { streamId =>
       val recordRates = latestBatches.map {
@@ -231,7 +231,7 @@ private[spark] class StreamingJobProgressListener(ssc: 
StreamingContext)
 
   def lastReceivedBatchRecords: Map[Int, Long] = synchronized {
     val lastReceivedBlockInfoOption =
-      lastReceivedBatch.map(_.streamIdToInputInfo.view.mapValues(_.numRecords))
+      lastReceivedBatch.map(_.streamIdToInputInfo.transform((_, v) => 
v.numRecords))
     lastReceivedBlockInfoOption.map { lastReceivedBlockInfo =>
       streamIds.map { streamId =>
         (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to