This is an automated email from the ASF dual-hosted git repository. dongjoon pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 938b7f58051 [SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES] Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues` 938b7f58051 is described below commit 938b7f580519e3da64004185f7083ae63cf99bc0 Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sat Dec 2 21:39:14 2023 -0800 [SPARK-46212][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES] Use other functions to simplify the code pattern of `s.c.MapOps#view.mapValues` ### What changes were proposed in this pull request? This pr simplifies `s.c.MapOps.view.mapValues` using the following approach: - For the `s.c.immutable.MapOps` type, replace it with the `s.c.immutable.MapOps#transform` function. ```scala def transform[W](f: (K, V) => W): CC[K, W] = map { case (k, v) => (k, f(k, v)) } ``` Like the case in `CountMinSketchSuite`: https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala#L59 - For the `s.c.MapOps` type, since the `transform` function does not exist for this type, replace it directly with the `map` function. ```scala def map[K2, V2](f: ((K, V)) => (K2, V2)): CC[K2, V2] = mapFactory.from(new View.Map(this, f)) ``` Like the case in `KafkaTestUtils`: https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala#L381 - For the `s.c.mutable.MapOps` type, the `transform` function has also been deprecated. At the same time, the signature of `transform` and its replacement function `mapValuesInPlace` is as follows: ```scala deprecated("Use mapValuesInPlace instead", "2.13.0") inline final def transform(f: (K, V) => V): this.type = mapValuesInPlace(f) def mapValuesInPlace(f: (K, V) => V): this.type = {...} ``` The target type of the value in the function is `V`, which is different from the target type of the value in `s.c.immutable.MapOps#transform`, which is `W`. This does not meet the desired requirement. So in this scenario, it can be divided into two sub-scenarios for handling: 1. If the `mutable.Map` are using needs to be eventually converted to an `immutable.Map`, first convert it to an `immutable.Map` and then use the `transform` function for replacement. Like the case in `SparkConnectPlanner`: https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala#L292 2. If the `mutable.Map` are using does not need to be converted to an `immutable.Map` in the end, directly use the `map` function from `scala.collection.MapOps` for replacement. Like the case in `SparkSession`: https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala#L313 In addition, there is a special case in `PythonWorkerFactory`: https://github.com/apache/spark/blob/0d40b1aea758b95a4416c8653599af8713a4aa16/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala#L381 For this case, it only needs to `destroy` each `Process` in `values` without returning any value. Therefore, it has been rewritten using `.values.foreach`. ### Why are the changes needed? The coding pattern of `s.c.MapOps.view.mapValues` seems verbose, it can be simplified using other functions. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions ### Was this patch authored or co-authored using generative AI tooling? No Closes #44122 from LuciferYang/SPARK-46212. Authored-by: yangjie01 <yangji...@baidu.com> Signed-off-by: Dongjoon Hyun <dh...@apple.com> --- .../spark/util/sketch/CountMinSketchSuite.scala | 2 +- .../org/apache/spark/sql/avro/AvroUtils.scala | 3 +-- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/ClientDataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/connect/dsl/package.scala | 2 +- .../sql/connect/planner/SparkConnectPlanner.scala | 15 ++++++----- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 2 +- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 3 ++- .../streaming/kafka010/ConsumerStrategy.scala | 9 ++++--- .../kafka010/DirectKafkaInputDStream.scala | 2 +- .../kafka010/DirectKafkaStreamSuite.scala | 4 +-- .../spark/streaming/kafka010/KafkaTestUtils.scala | 2 +- .../spark/streaming/kinesis/KinesisTestUtils.scala | 2 +- .../kinesis/KPLBasedKinesisTestUtils.scala | 2 +- .../kinesis/KinesisBackedBlockRDDSuite.scala | 4 +-- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 3 +-- .../org/apache/spark/api/java/JavaPairRDD.scala | 6 +++-- .../apache/spark/api/java/JavaSparkContext.scala | 2 +- .../spark/api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/scheduler/InputFormatInfo.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- ...plicationEnvironmentInfoWrapperSerializer.scala | 10 ++++---- .../ExecutorSummaryWrapperSerializer.scala | 2 +- .../status/protobuf/JobDataWrapperSerializer.scala | 2 +- .../protobuf/StageDataWrapperSerializer.scala | 9 ++++--- .../org/apache/spark/SparkThrowableSuite.scala | 2 +- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 5 ++-- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 3 +-- .../scheduler/ExecutorResourceInfoSuite.scala | 3 +-- .../BlockManagerDecommissionIntegrationSuite.scala | 2 +- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 2 +- .../util/collection/ExternalSorterSuite.scala | 2 +- .../apache/spark/examples/DFSReadWriteTest.scala | 3 +-- .../apache/spark/examples/MiniReadWriteTest.scala | 3 +-- .../mllib/PowerIterationClusteringExample.scala | 2 +- .../spark/graphx/lib/ShortestPathsSuite.scala | 2 +- .../spark/ml/evaluation/ClusteringMetrics.scala | 7 +++-- .../apache/spark/ml/feature/VectorIndexer.scala | 3 ++- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../apache/spark/ml/tree/impl/RandomForest.scala | 4 +-- .../spark/mllib/clustering/BisectingKMeans.scala | 2 +- .../mllib/linalg/distributed/BlockMatrix.scala | 4 +-- .../apache/spark/mllib/stat/test/ChiSqTest.scala | 3 +-- .../apache/spark/ml/recommendation/ALSSuite.scala | 8 +++--- .../apache/spark/mllib/feature/Word2VecSuite.scala | 12 ++++----- .../org/apache/spark/sql/types/Metadata.scala | 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 5 ++-- .../spark/sql/catalyst/analysis/unresolved.scala | 2 +- .../catalyst/catalog/ExternalCatalogUtils.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/catalyst/expressions/package.scala | 2 +- .../catalyst/optimizer/NestedColumnAliasing.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 3 +-- .../spark/sql/catalyst/parser/AstBuilder.scala | 8 +++--- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 11 ++++---- .../spark/sql/catalyst/util/ToNumberParser.scala | 2 +- .../spark/sql/errors/QueryCompilationErrors.scala | 5 ++-- .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../command/AnalyzePartitionCommand.scala | 2 +- .../execution/datasources/PartitioningUtils.scala | 2 +- .../execution/datasources/orc/OrcFiltersBase.scala | 3 +-- .../datasources/parquet/ParquetFilters.scala | 5 ++-- .../execution/exchange/EnsureRequirements.scala | 3 +-- .../sql/execution/streaming/ProgressReporter.scala | 14 +++++----- .../sql/execution/streaming/state/RocksDB.scala | 4 +-- .../execution/streaming/statefulOperators.scala | 2 +- .../spark/sql/execution/ui/AllExecutionsPage.scala | 6 ++--- .../org/apache/spark/sql/DataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- .../spark/sql/execution/SQLViewTestSuite.scala | 2 +- .../command/AlterTableDropPartitionSuiteBase.scala | 3 ++- .../datasources/parquet/ParquetIOSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsTestUtils.scala | 8 +++--- .../execution/ui/SQLAppStatusListenerSuite.scala | 30 +++++++++++----------- .../StreamingQueryStatusAndProgressSuite.scala | 6 ++--- .../sql/streaming/continuous/ContinuousSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 3 ++- .../api/java/JavaStreamingListenerWrapper.scala | 4 +-- .../scheduler/ReceiverSchedulingPolicy.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 6 ++--- .../apache/spark/streaming/ui/BatchUIData.scala | 3 ++- .../ui/StreamingJobProgressListener.scala | 4 +-- 84 files changed, 170 insertions(+), 172 deletions(-) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 689452caa32..3d89eaf4daf 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { // scalastyle:ignore funsuite val exactFreq = { val sampledItems = sampledItemIndices.map(allItems) - sampledItems.groupBy(identity).view.mapValues(_.length.toLong) + sampledItems.groupBy(identity).transform((_, v) => v.length.toLong) } val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index e235c13d413..27a5b918fc9 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -244,8 +244,7 @@ private[sql] object AvroUtils extends Logging { private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray private[this] val fieldMap = avroSchema.getFields.asScala .groupBy(_.name.toLowerCase(Locale.ROOT)) - .view - .mapValues(_.toSeq) // toSeq needed for scala 2.13 + .transform((_, v) => v.toSeq) // toSeq needed for scala 2.13 /** The fields which have matching equivalents in both Avro and Catalyst schemas. */ val matchedFields: Seq[AvroMatchedField] = catalystSchema.zipWithIndex.flatMap { diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index ca692d2d4f8..daa172e215a 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -310,7 +310,7 @@ class SparkSession private[sql] ( proto.SqlCommand .newBuilder() .setSql(sqlText) - .putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava))) + .putAllNamedArguments(args.asScala.map { case (k, v) => (k, lit(v).expr) }.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) val responseIter = client.execute(plan.build()) diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala index 747ca45d10d..d0a89f672f7 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala @@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession { val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === "a_b") // reduce by key - val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length) + val expected = data.map(t => (t, 1)).groupBy(_._1).transform((_, v) => v.length) val rows = crosstab.collect() rows.foreach { row => val i = row.getString(0).toInt diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index ccdae26bd35..6aadb6c34b7 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -362,7 +362,7 @@ package object dsl { } def fillValueMap(valueMap: Map[String, Any]): Relation = { - val (cols, values) = valueMap.view.mapValues(toLiteralProto).toSeq.unzip + val (cols, values) = valueMap.transform((_, v) => toLiteralProto(v)).toSeq.unzip Relation .newBuilder() .setFillNa( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index 4f8b7c00888..e2b4a3c782e 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -289,11 +289,13 @@ class SparkConnectPlanner( if (!namedArguments.isEmpty) { NameParameterizedQuery( parsedPlan, - namedArguments.asScala.view.mapValues(transformExpression).toMap) + namedArguments.asScala.toMap.transform((_, v) => transformExpression(v))) } else if (!posArguments.isEmpty) { PosParameterizedQuery(parsedPlan, posArguments.asScala.map(transformExpression).toSeq) } else if (!args.isEmpty) { - NameParameterizedQuery(parsedPlan, args.asScala.view.mapValues(transformLiteral).toMap) + NameParameterizedQuery( + parsedPlan, + args.asScala.toMap.transform((_, v) => transformLiteral(v))) } else if (!posArgs.isEmpty) { PosParameterizedQuery(parsedPlan, posArgs.asScala.map(transformLiteral).toSeq) } else { @@ -2535,7 +2537,7 @@ class SparkConnectPlanner( val df = if (!namedArguments.isEmpty) { session.sql( getSqlCommand.getSql, - namedArguments.asScala.view.mapValues(e => Column(transformExpression(e))).toMap, + namedArguments.asScala.toMap.transform((_, e) => Column(transformExpression(e))), tracker) } else if (!posArguments.isEmpty) { session.sql( @@ -2545,7 +2547,7 @@ class SparkConnectPlanner( } else if (!args.isEmpty) { session.sql( getSqlCommand.getSql, - args.asScala.view.mapValues(transformLiteral).toMap, + args.asScala.toMap.transform((_, v) => transformLiteral(v)), tracker) } else if (!posArgs.isEmpty) { session.sql(getSqlCommand.getSql, posArgs.asScala.map(transformLiteral).toArray, tracker) @@ -3298,14 +3300,13 @@ class SparkConnectPlanner( proto.GetResourcesCommandResult .newBuilder() .putAllResources( - session.sparkContext.resources.view - .mapValues(resource => + session.sparkContext.resources.toMap + .transform((_, resource) => proto.ResourceInformation .newBuilder() .setName(resource.name) .addAllAddresses(resource.addresses.toImmutableArraySeq.asJava) .build()) - .toMap .asJava) .build()) .build()) diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 27904ebb3c5..02e4e909734 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -2294,7 +2294,7 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { Execute { q => // wait to reach the last offset in every partition q.awaitOffset(0, - KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap), + KafkaSourceOffset(partitionOffsets.transform((_, _) => 3L)), streamingTimeout.toMillis) }, CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index d7e04962925..1fa1dda9faf 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -378,7 +378,8 @@ class KafkaTestUtils( } def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { - zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq + zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()) + .map { case (k, v) => (k, v.size) }.toSeq } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index 1dd66675b91..a0b0e92666e 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -241,7 +241,8 @@ object ConsumerStrategies { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long]( + offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava)) } /** @@ -320,7 +321,8 @@ object ConsumerStrategies { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long]( + offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava)) } /** @@ -404,7 +406,8 @@ object ConsumerStrategies { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long]( + offsets.map { case (k, v) => (k, jl.Long.valueOf(v)) }.asJava)) } /** diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index c412486ce19..d3795a194dd 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { kc = consumerStrategy.onStart( - currentOffsets.view.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava) + currentOffsets.transform((_, l) => java.lang.Long.valueOf(l)).asJava) } kc } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 33e6ccc7443..74c5e070b7a 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -726,8 +726,8 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = { - kafkaStream.generatedRDDs.view.mapValues { rdd => - rdd.asInstanceOf[HasOffsetRanges].offsetRanges + kafkaStream.generatedRDDs.map { case (t, rdd) => + (t, rdd.asInstanceOf[HasOffsetRanges].offsetRanges) }.toSeq.sortBy { _._1 } } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 66c253172e7..46a6fbcf2c3 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - sendMessages(topic, Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*)) + sendMessages(topic, messageToFreq.asScala.toMap.transform((_, v) => v.intValue())) } /** Send the messages to the Kafka broker */ diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 10d24df7b61..d0eee9c83c2 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator( sentSeqNumbers += ((num, seqNumber)) } - shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap + shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) } } diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index 2799965a8fc..58b64ba11d3 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService()) } producer.flushSync() - shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap + shardIdToSeqNumbers.toMap.transform((_, v) => v.toSeq) } } diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 7c6cf7c3a22..2088a9deafe 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -48,8 +48,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap + shardIdToData = shardIdToDataAndSeqNumbers.transform((_, v) => v.map(_._1)) + shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.transform((_, v) => v.map(_._2)) shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => val seqNumRange = SequenceNumberRange( testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index 6c18a8863af..fee1bcdc967 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -75,8 +75,7 @@ private[sql] object ProtobufUtils extends Logging { private[this] val protoFieldArray = descriptor.getFields.asScala.toArray private[this] val fieldMap = descriptor.getFields.asScala .groupBy(_.getName.toLowerCase(Locale.ROOT)) - .view - .mapValues(_.toSeq) // toSeq needed for scala 2.13 + .transform((_, v) => v.toSeq) // toSeq needed for scala 2.13 /** The fields which have matching equivalents in both Protobuf and Catalyst schemas. */ val matchedFields: Seq[ProtoMatchedField] = catalystSchema.zipWithIndex.flatMap { diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index aa8e1b1520e..f16c0be75c6 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -147,7 +147,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKey( withReplacement, - fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize + fractions.asScala.toMap + .transform((_, v) => v.toDouble), // map to Scala Double; toMap to serialize seed)) /** @@ -179,7 +180,8 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKeyExact( withReplacement, - fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize + fractions.asScala.toMap + .transform((_, v) => v.toDouble), // map to Scala Double; toMap to serialize seed)) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 1c6c44e773d..08c1134e30d 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -784,7 +784,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { * @note This does not necessarily mean the caching or computation was successful. */ def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = { - sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap + sc.getPersistentRDDs.toMap.transform((_, s) => JavaRDD.fromRDD(s)) .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]] } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index f6dbeadd96f..e385bc685bf 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -378,7 +378,7 @@ private[spark] class PythonWorkerFactory( daemon = null daemonPort = 0 } else { - simpleWorkers.view.mapValues(_.destroy()) + simpleWorkers.values.foreach(_.destroy()) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index e04c5b2de7d..7f56dc251db 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -178,6 +178,6 @@ object InputFormatInfo { } } - nodeToSplit.view.mapValues(_.toSet).toMap + nodeToSplit.toMap.transform((_, v) => v.toSet) } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index 15ae2fef221..454e7ed3ce6 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl( .build[String, ExecutorDecommissionState]() def runningTasksByExecutors: Map[String, Int] = synchronized { - executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap + executorIdToRunningTaskIds.toMap.transform((_, v) => v.size) } // The set of executors we have on each host; this is used to compute hostsAlive, which diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index e8519b4bb4d..4f1503f1d8c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -739,7 +739,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized { - executorDataMap.view.mapValues(v => v.registrationTs).toMap + executorDataMap.toMap.transform((_, v) => v.registrationTs) } override def isExecutorActive(id: String): Boolean = synchronized { diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala index 4ee864f00e8..0f192a9d67b 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala @@ -129,11 +129,11 @@ private[protobuf] class ApplicationEnvironmentInfoWrapperSerializer new ResourceProfileInfo( id = info.getId, - executorResources = - info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest) - .toMap, - taskResources = - info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap) + executorResources = info.getExecutorResourcesMap.asScala.toMap + .transform((_, v) => deserializeExecutorResourceRequest(v)), + taskResources = info.getTaskResourcesMap.asScala.toMap + .transform((_, v) => deserializeTaskResourceRequest(v)) + ) } private def deserializeExecutorResourceRequest(info: StoreTypes.ExecutorResourceRequest): diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala index 188187c9b75..3927764eaa3 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala @@ -138,7 +138,7 @@ private[protobuf] class ExecutorSummaryWrapperSerializer peakMemoryMetrics = peakMemoryMetrics, attributes = binary.getAttributesMap.asScala.toMap, resources = - binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap, + binary.getResourcesMap.asScala.toMap.transform((_, v) => deserializeResourceInformation(v)), resourceProfileId = binary.getResourceProfileId, isExcluded = binary.getIsExcluded, excludedInStages = binary.getExcludedInStagesList.asScala.map(_.toInt).toSet) diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 5252b8b8c01..69a97ba6c46 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWr numCompletedStages = info.getNumCompletedStages, numSkippedStages = info.getNumSkippedStages, numFailedStages = info.getNumFailedStages, - killedTasksSummary = info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap) + killedTasksSummary = info.getKillTasksSummaryMap.asScala.toMap.transform((_, v) => v.toInt)) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala index 4fbaff0327d..d83cff5f23a 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala @@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa new StageDataWrapper( info = info, jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet, - locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap + locality = binary.getLocalityMap.asScala.toMap.transform((_, v) => v.toLong) ) } @@ -402,8 +402,8 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap) } else None val executorSummary = if (MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) { - Some(binary.getExecutorSummaryMap.asScala.view.mapValues( - ExecutorStageSummarySerializer.deserialize).toMap) + Some(binary.getExecutorSummaryMap.asScala.toMap + .transform((_, v) => ExecutorStageSummarySerializer.deserialize(v))) } else None val speculationSummary = getOptional(binary.hasSpeculationSummary, @@ -475,7 +475,8 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa tasks = tasks, executorSummary = executorSummary, speculationSummary = speculationSummary, - killedTasksSummary = binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap, + killedTasksSummary = + binary.getKilledTasksSummaryMap.asScala.toMap.transform((_, v) => v.toInt), resourceProfileId = binary.getResourceProfileId, peakExecutorMetrics = peakExecutorMetrics, taskMetricsDistributions = taskMetricsDistributions, diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index 0206205c353..c012613c2ee 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite { } def checkIfUnique(ss: Seq[Any]): Unit = { - val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 > 1).keys.toSeq + val dups = ss.groupBy(identity).transform((_, v) => v.size).filter(_._2 > 1).keys.toSeq assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}") } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index e436d988434..c2ada90e7f6 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -806,8 +806,9 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { seed: Long, n: Long): Unit = { val trials = stratifiedData.countByKey() - val expectedSampleSize = stratifiedData.countByKey().view.mapValues(count => - math.ceil(count * samplingRate).toInt) + val expectedSampleSize = stratifiedData.countByKey().map { case (k, count) => + (k, math.ceil(count * samplingRate).toInt) + } val fractions = Map("1" -> samplingRate, "0" -> samplingRate) val sample = if (exact) { stratifiedData.sampleByKeyExact(true, fractions, seed) diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 706ebfa9364..55f6b5043fb 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1255,8 +1255,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { .getPartitions .map(coalescedRDD.getPreferredLocations(_).head) .groupBy(identity) - .view - .mapValues(_.length) + .transform((_, v) => v.length) // Make sure the coalesced partitions are distributed fairly evenly between the two locations. // This should not become flaky since the DefaultPartitionsCoalescer uses a fixed seed. diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index 8676efe3140..203e30cac1a 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -102,8 +102,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { // assert that each address was assigned `slots` times info.assignedAddrs .groupBy(identity) - .view - .mapValues(_.size) + .transform((_, v) => v.size) .foreach(x => assert(x._2 == slots)) addresses.foreach { addr => diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 46099254545..ba665600a1c 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -300,7 +300,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val blockLocs = rddUpdates.map { update => (update.blockUpdatedInfo.blockId.name, update.blockUpdatedInfo.blockManagerId)} - val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size) + val blocksToManagers = blockLocs.groupBy(_._1).transform((_, v) => v.size) assert(blocksToManagers.exists(_._2 > 1), s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index 75a450f4358..02900e14b1f 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer() ) - configureMockTransfer(blocks.view.mapValues(_ => createMockManagedBuffer(0)).toMap) + configureMockTransfer(blocks.transform((_, _) => createMockManagedBuffer(0))) val iterator = createShuffleBlockIteratorWithDefaults( Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 02cc2bb35af..5dc08396220 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val expected = (0 until 3).map { p => var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k % 3 == p }.toSet if (withPartialAgg) { - v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet + v = v.groupBy(_._1).transform((_, s) => s.map(_._2).sum).toSet } (p, v.toSet) }.toSet diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 4cad0f16426..03178342b89 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -89,8 +89,7 @@ object DFSReadWriteTest { .flatMap(_.split("\t")) .filter(_.nonEmpty) .groupBy(w => w) - .view - .mapValues(_.size) + .transform((_, v) => v.size) .values .sum } diff --git a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala index 9095c9b75af..6e9088a1628 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala @@ -81,8 +81,7 @@ object MiniReadWriteTest { .flatMap(_.split("\t")) .filter(_.nonEmpty) .groupBy(w => w) - .view - .mapValues(_.size) + .transform((_, v) => v.size) .values .sum } diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index d26c9a3a056..5b87cb1559a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -101,7 +101,7 @@ object PowerIterationClusteringExample { .setInitializationMode("degree") .run(circlesRdd) - val clusters = model.assignments.collect().groupBy(_.cluster).view.mapValues(_.map(_.id)) + val clusters = model.assignments.collect().groupBy(_.cluster).transform((_, v) => v.map(_.id)) val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignmentsStr = assignments .map { case (k, v) => diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala index 1f158808215..03e2ac7eeb6 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -34,7 +34,7 @@ class ShortestPathsSuite extends SparkFunSuite with LocalSparkContext { val graph = Graph.fromEdgeTuples(edges, 1) val landmarks = Seq(1, 4).map(_.toLong) val results = ShortestPaths.run(graph, landmarks).vertices.collect().map { - case (v, spMap) => (v, spMap.view.mapValues(i => i).toMap) + case (v, spMap) => (v, spMap.toMap.transform((_, i) => i)) } assert(results.toSet === shortestPaths) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala index a433f5a6e56..98fbe471f29 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala @@ -332,12 +332,11 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { clustersStatsRDD .collectAsMap() - .view - .mapValues { - case (featureSum: DenseVector, squaredNormSum: Double, weightSum: Double) => + .toMap + .transform { + case (_, (featureSum: DenseVector, squaredNormSum: Double, weightSum: Double)) => SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, weightSum) } - .toMap } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index ff997194b42..8eb8f81227c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -300,7 +300,8 @@ class VectorIndexerModel private[ml] ( /** Java-friendly version of [[categoryMaps]] */ @Since("1.4.0") def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = { - categoryMaps.view.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]] + categoryMaps.map { case (k, v) => (k, v.asJava) } + .asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]] } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 638d8463b9d..66b56f8b88e 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -218,7 +218,7 @@ class Word2VecModel private[ml] ( @Since("1.5.0") @transient lazy val getVectors: DataFrame = { val spark = SparkSession.builder().getOrCreate() - val wordVec = wordVectors.getVectors.view.mapValues(vec => Vectors.dense(vec.map(_.toDouble))) + val wordVec = wordVectors.getVectors.transform((_, vec) => Vectors.dense(vec.map(_.toDouble))) spark.createDataFrame(wordVec.toSeq).toDF("word", "vector") } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 4e5b8cd5efe..440b6635a52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -1292,8 +1292,8 @@ private[spark] object RandomForest extends Logging with Serializable { } // Convert mutable maps to immutable ones. val nodesForGroup: Map[Int, Array[LearningNode]] = - mutableNodesForGroup.view.mapValues(_.toArray).toMap - val treeToNodeToIndexInfo = mutableTreeToNodeToIndexInfo.view.mapValues(_.toMap).toMap + mutableNodesForGroup.toMap.transform((_, v) => v.toArray) + val treeToNodeToIndexInfo = mutableTreeToNodeToIndexInfo.toMap.transform((_, v) => v.toMap) (nodesForGroup, treeToNodeToIndexInfo) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index fce2537b761..6e5c026cd01 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -220,7 +220,7 @@ class BisectingKMeans private ( divisibleIndices.contains(parentIndex(index)) } newClusters = summarize(d, newAssignments, dMeasure) - newClusterCenters = newClusters.view.mapValues(_.center).map(identity).toMap + newClusterCenters = newClusters.transform((_, v) => v.center).map(identity) } if (preIndices != null) { preIndices.unpersist() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index af0d9e48a3b..4e9952e6d76 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -443,7 +443,7 @@ class BlockMatrix @Since("1.3.0") ( val leftMatrix = blockInfo.keys.collect() val rightMatrix = other.blockInfo.keys.collect() - val rightCounterpartsHelper = rightMatrix.groupBy(_._1).view.mapValues(_.map(_._2)) + val rightCounterpartsHelper = rightMatrix.groupBy(_._1).transform((_, v) => v.map(_._2)) val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) => val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex, Array.emptyIntArray) val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b))) @@ -452,7 +452,7 @@ class BlockMatrix @Since("1.3.0") ( partitions.toSet.map((pid: Int) => pid * midDimSplitNum + midDimSplitIndex)) }.toMap - val leftCounterpartsHelper = leftMatrix.groupBy(_._2).view.mapValues(_.map(_._1)) + val leftCounterpartsHelper = leftMatrix.groupBy(_._2).transform((_, v) => v.map(_._1)) val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) => val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex, Array.emptyIntArray) val partitions = leftCounterparts.map(b => partitioner.getPartition((b, colIndex))) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index d42df3e2f0d..9aeab65e25d 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -162,8 +162,7 @@ private[spark] object ChiSqTest extends Logging { .map { case ((label, _), c) => (label, c) } .toArray .groupBy(_._1) - .view - .mapValues(_.map(_._2).sum) + .transform((_, v) => v.map(_._2).sum) labelCounts.foreach { case (label, countByLabel) => val nnzByLabel = labelNNZ.getOrElse(label, 0L) val nzByLabel = countByLabel - nnzByLabel diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index dafdd06a3a2..ff85831a7a6 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -856,7 +856,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt - val expectedUpToN = expected.view.mapValues(_.slice(0, n)) + val expectedUpToN = expected.transform((_, v) => v.slice(0, n)) val topItems = model.recommendForAllUsers(k) assert(topItems.count() == numUsers) assert(topItems.columns.contains("user")) @@ -877,7 +877,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt - val expectedUpToN = expected.view.mapValues(_.slice(0, n)) + val expectedUpToN = expected.transform((_, v) => v.slice(0, n)) val topUsers = getALSModel.recommendForAllItems(k) assert(topUsers.count() == numItems) assert(topUsers.columns.contains("item")) @@ -899,7 +899,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt - val expectedUpToN = expected.view.mapValues(_.slice(0, n)) + val expectedUpToN = expected.transform((_, v) => v.slice(0, n)) val topItems = model.recommendForUserSubset(userSubset, k) assert(topItems.count() == numUsersSubset) assert(topItems.columns.contains("user")) @@ -921,7 +921,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt - val expectedUpToN = expected.view.mapValues(_.slice(0, n)) + val expectedUpToN = expected.transform((_, v) => v.slice(0, n)) val topUsers = model.recommendForItemSubset(itemSubset, k) assert(topUsers.count() == numItemsSubset) assert(topUsers.columns.contains("item")) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index 1973f306441..0b81a7c8aaf 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -43,8 +43,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { // and a Word2VecMap give the same values. val word2VecMap = model.getVectors val newModel = new Word2VecModel(word2VecMap) - assert(newModel.getVectors.view.mapValues(_.toSeq).toMap === - word2VecMap.view.mapValues(_.toSeq).toMap) + assert(newModel.getVectors.transform((_, v) => v.toSeq) === + word2VecMap.transform((_, v) => v.toSeq)) } test("Word2Vec throws exception when vocabulary is empty") { @@ -103,8 +103,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { try { model.save(sc, path) val sameModel = Word2VecModel.load(sc, path) - assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap === - model.getVectors.view.mapValues(_.toSeq).toMap) + assert(sameModel.getVectors.transform((_, v) => v.toSeq) === + model.getVectors.transform((_, v) => v.toSeq)) } finally { Utils.deleteRecursively(tempDir) } @@ -138,8 +138,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { try { model.save(sc, path) val sameModel = Word2VecModel.load(sc, path) - assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap === - model.getVectors.view.mapValues(_.toSeq).toMap) + assert(sameModel.getVectors.transform((_, v) => v.toSeq) === + model.getVectors.transform((_, v) => v.toSeq)) } catch { case t: Throwable => fail("exception thrown persisting a model " + diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala index a04b1ec6650..dcb80221b0e 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -211,7 +211,7 @@ object Metadata { // `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13, call // `toMap` for Scala version compatibility. case map: Map[_, _] => - map.view.mapValues(hash).toMap.## + map.transform((_, v) => hash(v)).## case arr: Array[_] => // Seq.empty[T] has the same hashCode regardless of T. arr.toImmutableArraySeq.map(hash).## diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 7e687d93091..6ddc3cbbd80 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1321,7 +1321,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor val partCols = partitionColumnNames(r.table) validatePartitionSpec(partCols, i.partitionSpec) - val staticPartitions = i.partitionSpec.filter(_._2.isDefined).view.mapValues(_.get).toMap + val staticPartitions = i.partitionSpec.filter(_._2.isDefined).transform((_, v) => v.get) val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions, isByName) @@ -3636,8 +3636,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // `GetStructField` without the name property. .collect { case g: GetStructField if g.name.isEmpty => g } .groupBy(_.child) - .view - .mapValues(_.map(_.ordinal).distinct.sorted) + .transform((_, v) => v.map(_.ordinal).distinct.sorted) structChildToOrdinals.foreach { case (expr, ordinals) => val schema = expr.dataType.asInstanceOf[StructType] diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala index eff29a78dad..9342d29245a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/unresolved.scala @@ -539,7 +539,7 @@ case class UnresolvedStarExcept(target: Option[Seq[String]], excepts: Seq[Seq[St : Seq[NamedExpression] = { // group the except pairs by the column they refer to. NOTE: no groupMap until scala 2.13 val groupedExcepts: AttributeMap[Seq[Seq[String]]] = - AttributeMap(excepts.groupBy(_._1.toAttribute).view.mapValues(v => v.map(_._2))) + AttributeMap(excepts.groupBy(_._1.toAttribute).transform((_, v) => v.map(_._2))) // map input columns while searching for the except entry corresponding to the current column columns.map(col => col -> groupedExcepts.get(col.toAttribute)).collect { diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 0f235e0977b..b20c6243898 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -207,7 +207,7 @@ object ExternalCatalogUtils { } def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = { - spec.view.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).map(identity).toMap + spec.transform((_, v) => if (v == null) DEFAULT_PARTITION_NAME else v).map(identity) } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index b8d52311d9b..f1373b2e593 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -969,7 +969,7 @@ class SessionCatalog( } else { _.toLowerCase(Locale.ROOT) } - val nameToCounts = viewColumnNames.groupBy(normalizeColName).view.mapValues(_.length) + val nameToCounts = viewColumnNames.groupBy(normalizeColName).transform((_, v) => v.length) val nameToCurrentOrdinal = scala.collection.mutable.HashMap.empty[String, Int] val viewDDL = buildViewDDL(metadata, isTempView) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index ea985ae5f30..36fde4da262 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -126,7 +126,7 @@ package object expressions { } private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { - m.view.mapValues(_.distinct).toMap + m.transform((_, v) => v.distinct) } /** Map to use for direct case insensitive attribute lookups. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 710c07fab7e..7ed68218f14 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -150,7 +150,7 @@ object NestedColumnAliasing { // A reference attribute can have multiple aliases for nested fields. val attrToAliases = - AttributeMap(attributeToExtractValuesAndAliases.view.mapValues(_.map(_._2))) + AttributeMap(attributeToExtractValuesAndAliases.transform((_, v) => v.map(_._2))) plan match { case Project(projectList, child) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index decef766ae9..a461bf529eb 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1058,8 +1058,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { .filter(_.references.exists(producerMap.contains)) .flatMap(collectReferences) .groupBy(identity) - .view - .mapValues(_.size) + .transform((_, v) => v.size) .forall { case (reference, count) => val producer = producerMap.getOrElse(reference, reference) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 57fe6ae346f..49bde72c48d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1084,8 +1084,8 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { // window w1 as (partition by p_mfgr order by p_name // range between 2 preceding and 2 following), // w2 as w1 - val windowMapView = baseWindowMap.view.mapValues { - case WindowSpecReference(name) => + val windowMapView = baseWindowMap.transform { + case (_, WindowSpecReference(name)) => baseWindowMap.get(name) match { case Some(spec: WindowSpecDefinition) => spec @@ -1094,12 +1094,12 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { case None => throw QueryParsingErrors.cannotResolveWindowReferenceError(name, ctx) } - case spec: WindowSpecDefinition => spec + case (_, spec: WindowSpecDefinition) => spec } // Note that mapValues creates a view instead of materialized map. We force materialization by // mapping over identity. - WithWindowDefinition(windowMapView.map(identity).toMap, query) + WithWindowDefinition(windowMapView.map(identity), query) } /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index cb11ec4b5f1..917a59d826d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -363,8 +363,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] case s: Seq[_] => s.map(mapChild) case m: Map[_, _] => - // `mapValues` is lazy and we need to force it to materialize by converting to Map - m.view.mapValues(mapChild).toMap + m.toMap.transform((_, v) => mapChild(v)) case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg) case Some(child) => Some(mapChild(child)) case nonChild: AnyRef => nonChild @@ -784,11 +783,11 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] case Some(arg: TreeNode[_]) if containsChild(arg) => Some(arg.asInstanceOf[BaseType].clone()) // `mapValues` is lazy and we need to force it to materialize by converting to Map - case m: Map[_, _] => m.view.mapValues { - case arg: TreeNode[_] if containsChild(arg) => + case m: Map[_, _] => m.toMap.transform { + case (_, arg: TreeNode[_]) if containsChild(arg) => arg.asInstanceOf[BaseType].clone() - case other => other - }.toMap + case (_, other) => other + } case d: DataType => d // Avoid unpacking Structs case args: LazyList[_] => args.map(mapChild).force // Force materialization on stream case args: Iterable[_] => args.map(mapChild) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala index 1274751ac94..b66658467c1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala @@ -334,7 +334,7 @@ class ToNumberParser(numberFormat: String, errorOnFail: Boolean) extends Seriali ) } // Make sure that the format string does not contain any prohibited duplicate tokens. - val inputTokenCounts = formatTokens.groupBy(identity).view.mapValues(_.size) + val inputTokenCounts = formatTokens.groupBy(identity).transform((_, v) => v.size) Seq(DecimalPoint(), OptionalPlusOrMinusSign(), OptionalMinusSign(), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index 603de520b18..0c0b940a588 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -229,9 +229,8 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat case seq => Some(CreateStruct(seq)).map(e => Alias(e, e.sql)()).get } .groupBy(_.dataType) - .view - .mapValues(values => values.map(value => toSQLId(value.name)).sorted) - .mapValues(values => if (values.length > 3) values.take(3) :+ "..." else values) + .transform((_, values) => values.map(value => toSQLId(value.name)).sorted) + .transform((_, values) => if (values.length > 3) values.take(3) :+ "..." else values) .toList.sortBy(_._1.sql) .map { case (dataType, values) => s"${toSQLType(dataType)} (${values.mkString(", ")})" } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index 24497add04f..9bc60f067dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -700,7 +700,7 @@ class SparkSession private( val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { val parsedPlan = sessionState.sqlParser.parsePlan(sqlText) if (args.nonEmpty) { - NameParameterizedQuery(parsedPlan, args.view.mapValues(lit(_).expr).toMap) + NameParameterizedQuery(parsedPlan, args.transform((_, v) => lit(v).expr)) } else { parsedPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index 7fe4c73abf9..98a851f19f0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -61,7 +61,7 @@ case class AnalyzePartitionCommand( tableId.table, tableId.database.get, schemaColumns, specColumns) } - val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).view.mapValues(_.get) + val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).transform((_, v) => v.get) if (filteredSpec.isEmpty) { None } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 959a99c95cc..9905e9af9b0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -407,7 +407,7 @@ object PartitioningUtils extends SQLConfHelper { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.view.mapValues(_.map { case (_, value) => value }).toMap + seq.groupBy { case (key, _) => key }.transform((_, v) => v.map { case (_, value) => value }) val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { case (path, partValues) => partValues.columnNames -> path diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index 4a608a47b38..c4e12d5c4ae 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -82,8 +82,7 @@ trait OrcFiltersBase { val dedupPrimitiveFields = primitiveFields .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) - .view - .mapValues(_.head._2) + .transform((_, v) => v.head._2) CaseInsensitiveMap(dedupPrimitiveFields.toMap) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index 02cf34aee3e..5f6890b94f4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -111,9 +111,8 @@ class ParquetFilters( primitiveFields .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) - .view - .mapValues(_.head._2) - CaseInsensitiveMap(dedupPrimitiveFields.toMap) + .transform((_, v) => v.head._2) + CaseInsensitiveMap(dedupPrimitiveFields) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index bddf6d02f1f..2a7c1206bb4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -491,8 +491,7 @@ case class EnsureRequirements( val numExpectedPartitions = partValues .map(InternalRowComparableWrapper(_, partitionExprs)) .groupBy(identity) - .view - .mapValues(_.size) + .transform((_, v) => v.size) mergedPartValues = mergedPartValues.map { case (partVal, numParts) => (partVal, numExpectedPartitions.getOrElse( diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index ffdf9da6e58..2a9aa82a148 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -136,9 +136,9 @@ trait ProgressReporter extends Logging { from: StreamProgress, to: StreamProgress, latest: StreamProgress): Unit = { - currentTriggerStartOffsets = from.view.mapValues(_.json).toMap - currentTriggerEndOffsets = to.view.mapValues(_.json).toMap - currentTriggerLatestOffsets = latest.view.mapValues(_.json).toMap + currentTriggerStartOffsets = from.transform((_, v) => v.json) + currentTriggerEndOffsets = to.transform((_, v) => v.json) + currentTriggerLatestOffsets = latest.transform((_, v) => v.json) latestStreamProgress = to } @@ -243,7 +243,7 @@ trait ProgressReporter extends Logging { batchId = currentBatchId, batchDuration = processingTimeMills, durationMs = - new java.util.HashMap(currentDurationsMs.toMap.view.mapValues(long2Long).toMap.asJava), + new java.util.HashMap(currentDurationsMs.toMap.transform((_, v) => long2Long(v)).asJava), eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, @@ -297,17 +297,17 @@ trait ProgressReporter extends Logging { Map( "max" -> stats.max, "min" -> stats.min, - "avg" -> stats.avg.toLong).view.mapValues(formatTimestamp) + "avg" -> stats.avg.toLong).transform((_, v) => formatTimestamp(v)) }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp - ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap) + ExecutionStats(numInputRows, stateOperators, eventTimeStats) } /** Extract number of input sources for each streaming source in plan */ private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = { def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { - tuples.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // sum up rows for each source + tuples.groupBy(_._1).transform((_, v) => v.map(_._2).sum) // sum up rows for each source } def unrollCTE(plan: LogicalPlan): LogicalPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index feb745b0402..3a42f9e2ccb 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -507,7 +507,7 @@ class RocksDB( "put" -> DB_WRITE, "compaction" -> COMPACTION_TIME ).toMap - val nativeOpsLatencyMicros = nativeOpsHistograms.view.mapValues { typ => + val nativeOpsLatencyMicros = nativeOpsHistograms.transform { (_, typ) => RocksDBNativeHistogram(nativeStats.getHistogramData(typ)) } val nativeOpsMetricTickers = Seq( @@ -530,7 +530,7 @@ class RocksDB( /** Number of bytes written during flush */ "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES ).toMap - val nativeOpsMetrics = nativeOpsMetricTickers.view.mapValues { typ => + val nativeOpsMetrics = nativeOpsMetricTickers.transform { (_, typ) => nativeStats.getTickerCount(typ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index b36aa264e28..57db193a4c8 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -156,7 +156,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp .map(entry => entry._1 -> longMetric(entry._1).value) val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = - new java.util.HashMap(customMetrics.view.mapValues(long2Long).toMap.asJava) + new java.util.HashMap(customMetrics.transform((_, v) => long2Long(v)).asJava) // We now don't report number of shuffle partitions inside the state operator. Instead, // it will be filled when the stream query progress is reported diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index f66b99a154e..c9090cacdb3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -79,7 +79,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "running", running.toSeq, - executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq), currentTime, showErrorMessage = false, showRunningJobs = true, @@ -105,7 +105,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "completed", completed.toSeq, - executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq), currentTime, showErrorMessage = false, showRunningJobs = false, @@ -132,7 +132,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "failed", failed.toSeq, - executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.toMap.transform((_, v) => v.toSeq), currentTime, showErrorMessage = true, showRunningJobs = false, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 20ac2a9e946..8b53edb0d3c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -347,7 +347,7 @@ class DataFrameStatSuite extends QueryTest with SharedSparkSession { val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === "a_b") // reduce by key - val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length) + val expected = data.map(t => (t, 1)).groupBy(_._1).transform((_, v) => v.length) val rows = crosstab.collect() rows.foreach { row => val i = row.getString(0).toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index d9f4c685163..03296476679 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -426,7 +426,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // We need to do cartesian product for all the config dimensions, to get a list of // config sets, and run the query once for each config set. val configDimLines = comments.filter(_.startsWith("--CONFIG_DIM")).map(_.substring(12)) - val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).view.mapValues { lines => + val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).transform { (_, lines) => lines.map(_.dropWhile(_ != ' ').substring(1)).map(_.split(",").map { kv => val (conf, value) = kv.span(_ != '=') conf.trim -> value.substring(1).trim diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index c5e65d2e65f..6a597d7ab4f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -527,7 +527,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { sparkContext.parallelize(Seq(Row(Map("a" -> new BigDecimal("2011000000000002456556"))))), StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30, 0)))))) val udf2 = org.apache.spark.sql.functions.udf((map: Map[String, BigDecimal]) => { - map.view.mapValues(value => if (value == null) null else value.toBigInteger.toString).toMap + map.transform((_, value) => if (value == null) null else value.toBigInteger.toString) }) checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" -> "2011000000000002456556")))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index b7907b99495..d2740f9eac7 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -630,7 +630,7 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { val meta = catalog.getTableRawMetadata(TableIdentifier("test_view", Some("default"))) // simulate a view meta with incompatible schema change val newProp = meta.properties - .view.mapValues(_.replace("col_i", "col_j")).toMap + .transform((_, v) => v.replace("col_i", "col_j")) val newSchema = StructType(Seq(StructField("col_j", IntegerType))) catalog.alterTable(meta.copy(properties = newProp, schema = newSchema)) val e = intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala index 199d1b8b4b6..02c9d318bb4 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala @@ -47,7 +47,8 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil t: String, ifExists: String, specs: Map[String, Any]*): Unit = { - checkPartitions(t, specs.map(_.view.mapValues(_.toString).toMap): _*) + checkPartitions(t, + specs.map(_.map { case (k, v) => (k, v.toString) }.toMap): _*) val specStr = specs.map(partSpecToString).mkString(", ") sql(s"ALTER TABLE $t DROP $ifExists $specStr") checkPartitions(t) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index 1efa8221e41..374a8a8078e 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -392,7 +392,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => - Row(m.view.mapValues(struct => Row(struct.productIterator.toSeq: _*))) + Row(m.transform((_, struct) => Row(struct.productIterator.toSeq: _*))) }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index 30315c12b58..72aa607591d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -216,13 +216,13 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNumOfJobs: Int, expectedMetrics: Map[Long, (String, Map[String, Any])], enableWholeStage: Boolean = false): Unit = { - val expectedMetricsPredicates = expectedMetrics.view.mapValues { case (nodeName, nodeMetrics) => - (nodeName, nodeMetrics.view.mapValues(expectedMetricValue => + val expectedMetricsPredicates = expectedMetrics.transform { case (_, (nodeName, nodeMetrics)) => + (nodeName, nodeMetrics.transform((_, expectedMetricValue) => (actualMetricValue: Any) => { actualMetricValue.toString.matches(expectedMetricValue.toString) - }).toMap) + })) } - testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates.toMap, + testSparkPlanMetricsWithPredicates(df, expectedNumOfJobs, expectedMetricsPredicates, enableWholeStage) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 5affc9ef3b2..17e77cf8d8f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -220,23 +220,23 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 2).toMap) + accumulatorUpdates.transform((_, v) => v * 2)) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 2).toMap) + accumulatorUpdates.transform((_, v) => v * 2)) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.view.mapValues(_ * 2).toMap)) + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.transform((_, v) => v * 2))) ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 3).toMap) + accumulatorUpdates.transform((_, v) => v * 3)) // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) @@ -250,7 +250,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 2).toMap) + accumulatorUpdates.transform((_, v) => v * 2)) // Ignore the task end for the first attempt listener.onTaskEnd(SparkListenerTaskEnd( @@ -258,12 +258,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 100).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 100)), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 2).toMap) + accumulatorUpdates.transform((_, v) => v * 2)) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -271,7 +271,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 2).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 2)), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -279,12 +279,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), + createTaskInfo(1, 0, accums = accumulatorUpdates.transform((_, v) => v * 3)), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 5).toMap) + accumulatorUpdates.transform((_, v) => v * 5)) // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) @@ -298,7 +298,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 7).toMap) + accumulatorUpdates.transform((_, v) => v * 7)) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -306,7 +306,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.transform((_, v) => v * 3)), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -314,12 +314,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), + createTaskInfo(1, 0, accums = accumulatorUpdates.transform((_, v) => v * 3)), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 11).toMap) + accumulatorUpdates.transform((_, v) => v * 11)) assertJobs(statusStore.execution(executionId), running = Seq(0)) @@ -334,7 +334,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes assertJobs(statusStore.execution(executionId), completed = Seq(0)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.view.mapValues(_ * 11).toMap) + accumulatorUpdates.transform((_, v) => v * 11)) } test("control a plan explain mode in listeners via SQLConf") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index a0f102109b1..dce27bdc5d1 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -436,7 +436,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).view.mapValues(long2Long).toMap.asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => long2Long(v)).asJava), eventTime = new java.util.HashMap(Map( "max" -> "2016-12-05T20:54:20.827Z", "min" -> "2016-12-05T20:54:20.827Z", @@ -448,7 +448,7 @@ object StreamingQueryStatusAndProgressSuite { numShufflePartitions = 2, numStateStoreInstances = 2, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) - .view.mapValues(long2Long).toMap.asJava) + .transform((_, v) => long2Long(v)).asJava) )), sources = Array( new SourceProgress( @@ -474,7 +474,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).view.mapValues(long2Long).toMap.asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).transform((_, v) => long2Long(v)).asJava), // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress(operatorName = "op2", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index e5e873cca12..f1a63ac0f78 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest { c.committedOffsets.lastOption.map { case (_, offset) => offset match { case o: RateStreamOffset => - o.partitionToValueAndRunTimeMs.view.mapValues(_.value).toMap + o.partitionToValueAndRunTimeMs.transform((_, v) => v.value) } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index f52ff8cf9a1..32526f1d18f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1261,7 +1261,8 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { val catalogTable = getTable(db, table) - val partColNameMap = buildLowerCasePartColNameMap(catalogTable).view.mapValues(escapePathName) + val partColNameMap = + buildLowerCasePartColNameMap(catalogTable).transform((_, v) => escapePathName(v)) val clientPartitionNames = client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec)) clientPartitionNames.map { partitionPath => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index 5a124be5679..2f981454de4 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -65,7 +65,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = { JavaBatchInfo( batchInfo.batchTime, - batchInfo.streamIdToInputInfo.view.mapValues(toJavaStreamInputInfo).toMap.asJava, + batchInfo.streamIdToInputInfo.transform((_, v) => toJavaStreamInputInfo(v)).asJava, batchInfo.submissionTime, batchInfo.processingStartTime.getOrElse(-1), batchInfo.processingEndTime.getOrElse(-1), @@ -73,7 +73,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav batchInfo.processingDelay.getOrElse(-1), batchInfo.totalDelay.getOrElse(-1), batchInfo.numRecords, - batchInfo.outputOperationInfos.view.mapValues(toJavaOutputOperationInfo).toMap.asJava + batchInfo.outputOperationInfos.transform((_, v) => toJavaOutputOperationInfo(v)).asJava ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index 5c47cde0459..79b1d0c545b 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -184,7 +184,7 @@ private[streaming] class ReceiverSchedulingPolicy { val executorWeights: Map[ExecutorCacheTaskLocation, Double] = { receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights) - .groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // Sum weights for each executor + .groupBy(_._1).transform((_, v) => v.map(_._2).sum) // Sum weights for each executor } val idleExecutors = executors.toSet -- executorWeights.keys diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index d5bc658b4b5..52ea8fdfbfe 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -245,11 +245,11 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { - endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).view.mapValues { - _.runningExecutor.map { + endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).transform { (_, v) => + v.runningExecutor.map { _.executorId } - }.toMap + } } else { Map.empty } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index 08de8d46c31..02f6dd9fcac 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -98,7 +98,8 @@ private[ui] object BatchUIData { def apply(batchInfo: BatchInfo): BatchUIData = { val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]() - outputOperations ++= batchInfo.outputOperationInfos.view.mapValues(OutputOperationUIData.apply) + outputOperations ++= batchInfo.outputOperationInfos + .transform((_, v) => OutputOperationUIData(v)) new BatchUIData( batchInfo.batchTime, batchInfo.streamIdToInputInfo, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 140eb866f6f..bbe99f64707 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -217,7 +217,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) val _retainedBatches = retainedBatches val latestBatches = _retainedBatches.map { batchUIData => (batchUIData.batchTime.milliseconds, - batchUIData.streamIdToInputInfo.view.mapValues(_.numRecords)) + batchUIData.streamIdToInputInfo.transform((_, v) => v.numRecords)) } streamIds.map { streamId => val recordRates = latestBatches.map { @@ -231,7 +231,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) def lastReceivedBatchRecords: Map[Int, Long] = synchronized { val lastReceivedBlockInfoOption = - lastReceivedBatch.map(_.streamIdToInputInfo.view.mapValues(_.numRecords)) + lastReceivedBatch.map(_.streamIdToInputInfo.transform((_, v) => v.numRecords)) lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => streamIds.map { streamId => (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org