This is an automated email from the ASF dual-hosted git repository. srowen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push: new 89ca8b6065e [SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES] Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues` 89ca8b6065e is described below commit 89ca8b6065e9f690a492c778262080741d50d94d Author: yangjie01 <yangji...@baidu.com> AuthorDate: Sun Oct 29 09:19:30 2023 -0500 [SPARK-45605][CORE][SQL][SS][CONNECT][MLLIB][GRAPHX][DSTREAM][PROTOBUF][EXAMPLES] Replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues` ### What changes were proposed in this pull request? This pr replace `s.c.MapOps.mapValues` with `s.c.MapOps.view.mapValues` due to `s.c.MapOps.mapValues` marked as deprecated since Scala 2.13.0: https://github.com/scala/scala/blob/bf45e199e96383b96a6955520d7d2524c78e6e12/src/library/scala/collection/Map.scala#L256-L262 ```scala deprecated("Use .view.mapValues(f). A future version will include a strict version of this method (for now, .view.mapValues(f).toMap).", "2.13.0") def mapValues[W](f: V => W): MapView[K, W] = new MapView.MapValues(this, f) ``` ### Why are the changes needed? Cleanup deprecated API usage. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Acitons - Packaged the client, manually tested `DFSReadWriteTest/MiniReadWriteTest/PowerIterationClusteringExample`. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43448 from LuciferYang/SPARK-45605. Lead-authored-by: yangjie01 <yangji...@baidu.com> Co-authored-by: YangJie <yangji...@baidu.com> Signed-off-by: Sean Owen <sro...@gmail.com> --- .../spark/util/sketch/CountMinSketchSuite.scala | 2 +- .../org/apache/spark/sql/avro/AvroUtils.scala | 1 + .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../spark/sql/ClientDataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/connect/dsl/package.scala | 2 +- .../sql/connect/planner/SparkConnectPlanner.scala | 13 ++++++---- .../sql/kafka010/KafkaMicroBatchSourceSuite.scala | 3 ++- .../apache/spark/sql/kafka010/KafkaTestUtils.scala | 2 +- .../streaming/kafka010/ConsumerStrategy.scala | 6 ++--- .../kafka010/DirectKafkaInputDStream.scala | 2 +- .../kafka010/DirectKafkaStreamSuite.scala | 2 +- .../spark/streaming/kafka010/KafkaTestUtils.scala | 2 +- .../spark/streaming/kinesis/KinesisTestUtils.scala | 2 +- .../kinesis/KPLBasedKinesisTestUtils.scala | 2 +- .../kinesis/KinesisBackedBlockRDDSuite.scala | 4 +-- .../spark/sql/protobuf/utils/ProtobufUtils.scala | 1 + .../org/apache/spark/api/java/JavaPairRDD.scala | 4 +-- .../apache/spark/api/java/JavaSparkContext.scala | 2 +- .../spark/api/python/PythonWorkerFactory.scala | 2 +- .../apache/spark/scheduler/InputFormatInfo.scala | 2 +- .../apache/spark/scheduler/TaskSchedulerImpl.scala | 2 +- .../cluster/CoarseGrainedSchedulerBackend.scala | 2 +- ...plicationEnvironmentInfoWrapperSerializer.scala | 5 ++-- .../ExecutorSummaryWrapperSerializer.scala | 3 ++- .../status/protobuf/JobDataWrapperSerializer.scala | 2 +- .../protobuf/StageDataWrapperSerializer.scala | 6 ++--- .../org/apache/spark/SparkThrowableSuite.scala | 2 +- .../apache/spark/rdd/PairRDDFunctionsSuite.scala | 2 +- .../test/scala/org/apache/spark/rdd/RDDSuite.scala | 1 + .../scheduler/ExecutorResourceInfoSuite.scala | 1 + .../BlockManagerDecommissionIntegrationSuite.scala | 2 +- .../storage/ShuffleBlockFetcherIteratorSuite.scala | 2 +- .../util/collection/ExternalSorterSuite.scala | 2 +- .../apache/spark/examples/DFSReadWriteTest.scala | 1 + .../apache/spark/examples/MiniReadWriteTest.scala | 1 + .../mllib/PowerIterationClusteringExample.scala | 2 +- .../spark/graphx/lib/ShortestPathsSuite.scala | 2 +- .../spark/ml/evaluation/ClusteringMetrics.scala | 1 + .../apache/spark/ml/feature/VectorIndexer.scala | 2 +- .../org/apache/spark/ml/feature/Word2Vec.scala | 2 +- .../apache/spark/ml/tree/impl/RandomForest.scala | 4 +-- .../spark/mllib/clustering/BisectingKMeans.scala | 2 +- .../mllib/linalg/distributed/BlockMatrix.scala | 4 +-- .../apache/spark/mllib/stat/test/ChiSqTest.scala | 1 + .../apache/spark/ml/recommendation/ALSSuite.scala | 8 +++--- .../apache/spark/mllib/feature/Word2VecSuite.scala | 12 ++++----- .../org/apache/spark/sql/types/Metadata.scala | 2 +- .../spark/sql/catalyst/analysis/Analyzer.scala | 3 ++- .../catalyst/catalog/ExternalCatalogUtils.scala | 2 +- .../sql/catalyst/catalog/SessionCatalog.scala | 2 +- .../spark/sql/catalyst/expressions/package.scala | 2 +- .../catalyst/optimizer/NestedColumnAliasing.scala | 2 +- .../spark/sql/catalyst/optimizer/Optimizer.scala | 1 + .../spark/sql/catalyst/parser/AstBuilder.scala | 2 +- .../apache/spark/sql/catalyst/trees/TreeNode.scala | 4 +-- .../spark/sql/catalyst/util/ToNumberParser.scala | 2 +- .../spark/sql/errors/QueryCompilationErrors.scala | 1 + .../scala/org/apache/spark/sql/SparkSession.scala | 2 +- .../command/AnalyzePartitionCommand.scala | 2 +- .../execution/datasources/PartitioningUtils.scala | 2 +- .../execution/datasources/orc/OrcFiltersBase.scala | 1 + .../datasources/parquet/ParquetFilters.scala | 1 + .../execution/exchange/EnsureRequirements.scala | 1 + .../sql/execution/streaming/ProgressReporter.scala | 12 ++++----- .../sql/execution/streaming/state/RocksDB.scala | 4 +-- .../execution/streaming/statefulOperators.scala | 2 +- .../spark/sql/execution/ui/AllExecutionsPage.scala | 6 ++--- .../org/apache/spark/sql/DataFrameStatSuite.scala | 2 +- .../org/apache/spark/sql/SQLQueryTestSuite.scala | 2 +- .../test/scala/org/apache/spark/sql/UDFSuite.scala | 2 +- .../spark/sql/execution/SQLViewTestSuite.scala | 2 +- .../command/AlterTableDropPartitionSuiteBase.scala | 2 +- .../datasources/parquet/ParquetIOSuite.scala | 2 +- .../sql/execution/metric/SQLMetricsTestUtils.scala | 4 +-- .../execution/ui/SQLAppStatusListenerSuite.scala | 30 +++++++++++----------- .../StreamingQueryStatusAndProgressSuite.scala | 6 ++--- .../sql/streaming/continuous/ContinuousSuite.scala | 2 +- .../spark/sql/hive/HiveExternalCatalog.scala | 2 +- .../api/java/JavaStreamingListenerWrapper.scala | 4 +-- .../scheduler/ReceiverSchedulingPolicy.scala | 2 +- .../streaming/scheduler/ReceiverTracker.scala | 2 +- .../apache/spark/streaming/ui/BatchUIData.scala | 2 +- .../ui/StreamingJobProgressListener.scala | 5 ++-- 83 files changed, 140 insertions(+), 119 deletions(-) diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala index 087dae26047..689452caa32 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/CountMinSketchSuite.scala @@ -56,7 +56,7 @@ class CountMinSketchSuite extends AnyFunSuite { // scalastyle:ignore funsuite val exactFreq = { val sampledItems = sampledItemIndices.map(allItems) - sampledItems.groupBy(identity).mapValues(_.length.toLong) + sampledItems.groupBy(identity).view.mapValues(_.length.toLong) } val sketch = CountMinSketch.create(epsOfTotalCount, confidence, seed) diff --git a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala index 6a1655a91c9..e738f541ca7 100644 --- a/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala +++ b/connector/avro/src/main/scala/org/apache/spark/sql/avro/AvroUtils.scala @@ -242,6 +242,7 @@ private[sql] object AvroUtils extends Logging { private[this] val avroFieldArray = avroSchema.getFields.asScala.toArray private[this] val fieldMap = avroSchema.getFields.asScala .groupBy(_.name.toLowerCase(Locale.ROOT)) + .view .mapValues(_.toSeq) // toSeq needed for scala 2.13 /** The fields which have matching equivalents in both Avro and Catalyst schemas. */ diff --git a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala index 5e640bea570..78daaa5d3f5 100644 --- a/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/connector/connect/client/jvm/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -306,7 +306,7 @@ class SparkSession private[sql] ( proto.SqlCommand .newBuilder() .setSql(sqlText) - .putAllNamedArguments(args.asScala.mapValues(lit(_).expr).toMap.asJava))) + .putAllNamedArguments(args.asScala.view.mapValues(lit(_).expr).toMap.asJava))) val plan = proto.Plan.newBuilder().setCommand(cmd) // .toBuffer forces that the iterator is consumed and closed val responseSeq = client.execute(plan.build()).toBuffer.toSeq diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala index 069d8ec502f..747ca45d10d 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientDataFrameStatSuite.scala @@ -120,7 +120,7 @@ class ClientDataFrameStatSuite extends RemoteSparkSession { val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === "a_b") // reduce by key - val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length) + val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length) val rows = crosstab.collect() rows.foreach { row => val i = row.getString(0).toInt diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala index 7c41491ba06..24fa2324f66 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/dsl/package.scala @@ -361,7 +361,7 @@ package object dsl { } def fillValueMap(valueMap: Map[String, Any]): Relation = { - val (cols, values) = valueMap.mapValues(toLiteralProto).toSeq.unzip + val (cols, values) = valueMap.view.mapValues(toLiteralProto).toSeq.unzip Relation .newBuilder() .setFillNa( diff --git a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala index f5d83b81999..17c10e63301 100644 --- a/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala +++ b/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/SparkConnectPlanner.scala @@ -287,11 +287,11 @@ class SparkConnectPlanner( if (!namedArguments.isEmpty) { NameParameterizedQuery( parsedPlan, - namedArguments.asScala.mapValues(transformExpression).toMap) + namedArguments.asScala.view.mapValues(transformExpression).toMap) } else if (!posArguments.isEmpty) { PosParameterizedQuery(parsedPlan, posArguments.asScala.map(transformExpression).toSeq) } else if (!args.isEmpty) { - NameParameterizedQuery(parsedPlan, args.asScala.mapValues(transformLiteral).toMap) + NameParameterizedQuery(parsedPlan, args.asScala.view.mapValues(transformLiteral).toMap) } else if (!posArgs.isEmpty) { PosParameterizedQuery(parsedPlan, posArgs.asScala.map(transformLiteral).toSeq) } else { @@ -2518,7 +2518,7 @@ class SparkConnectPlanner( val df = if (!namedArguments.isEmpty) { session.sql( getSqlCommand.getSql, - namedArguments.asScala.mapValues(e => Column(transformExpression(e))).toMap, + namedArguments.asScala.view.mapValues(e => Column(transformExpression(e))).toMap, tracker) } else if (!posArguments.isEmpty) { session.sql( @@ -2526,7 +2526,10 @@ class SparkConnectPlanner( posArguments.asScala.map(e => Column(transformExpression(e))).toArray, tracker) } else if (!args.isEmpty) { - session.sql(getSqlCommand.getSql, args.asScala.mapValues(transformLiteral).toMap, tracker) + session.sql( + getSqlCommand.getSql, + args.asScala.view.mapValues(transformLiteral).toMap, + tracker) } else if (!posArgs.isEmpty) { session.sql(getSqlCommand.getSql, posArgs.asScala.map(transformLiteral).toArray, tracker) } else { @@ -3262,7 +3265,7 @@ class SparkConnectPlanner( proto.GetResourcesCommandResult .newBuilder() .putAllResources( - session.sparkContext.resources + session.sparkContext.resources.view .mapValues(resource => proto.ResourceInformation .newBuilder() diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala index 4e8da137a47..54e05ff95c9 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala @@ -2294,7 +2294,8 @@ abstract class KafkaSourceSuiteBase extends KafkaSourceTest { Execute { q => // wait to reach the last offset in every partition q.awaitOffset(0, - KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L).toMap), streamingTimeout.toMillis) + KafkaSourceOffset(partitionOffsets.view.mapValues(_ => 3L).toMap), + streamingTimeout.toMillis) }, CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), StopStream, diff --git a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala index c54afc6290b..1624f7320bb 100644 --- a/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala @@ -377,7 +377,7 @@ class KafkaTestUtils( } def getAllTopicsAndPartitionSize(): Seq[(String, Int)] = { - zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).mapValues(_.size).toSeq + zkClient.getPartitionsForTopics(zkClient.getAllTopicsInCluster()).view.mapValues(_.size).toSeq } /** Create a Kafka topic and wait until it is propagated to the whole cluster */ diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala index b96a2597f5d..1dd66675b91 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/ConsumerStrategy.scala @@ -241,7 +241,7 @@ object ConsumerStrategies { new Subscribe[K, V]( new ju.ArrayList(topics.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) } /** @@ -320,7 +320,7 @@ object ConsumerStrategies { new SubscribePattern[K, V]( pattern, new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) } /** @@ -404,7 +404,7 @@ object ConsumerStrategies { new Assign[K, V]( new ju.ArrayList(topicPartitions.asJavaCollection), new ju.HashMap[String, Object](kafkaParams.asJava), - new ju.HashMap[TopicPartition, jl.Long](offsets.mapValues(jl.Long.valueOf).toMap.asJava)) + new ju.HashMap[TopicPartition, jl.Long](offsets.view.mapValues(jl.Long.valueOf).toMap.asJava)) } /** diff --git a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala index d8037269961..f5967a74ad3 100644 --- a/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala +++ b/connector/kafka-0-10/src/main/scala/org/apache/spark/streaming/kafka010/DirectKafkaInputDStream.scala @@ -71,7 +71,7 @@ private[spark] class DirectKafkaInputDStream[K, V]( def consumer(): Consumer[K, V] = this.synchronized { if (null == kc) { kc = consumerStrategy.onStart( - currentOffsets.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava) + currentOffsets.view.mapValues(l => java.lang.Long.valueOf(l)).toMap.asJava) } kc } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala index 7b2cac4a68b..faf114108fa 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/DirectKafkaStreamSuite.scala @@ -726,7 +726,7 @@ class DirectKafkaStreamSuite /** Get the generated offset ranges from the DirectKafkaStream */ private def getOffsetRanges[K, V]( kafkaStream: DStream[ConsumerRecord[K, V]]): Seq[(Time, Array[OffsetRange])] = { - kafkaStream.generatedRDDs.mapValues { rdd => + kafkaStream.generatedRDDs.view.mapValues { rdd => rdd.asInstanceOf[HasOffsetRanges].offsetRanges }.toSeq.sortBy { _._1 } } diff --git a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala index 6a9ef52e990..66c253172e7 100644 --- a/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala +++ b/connector/kafka-0-10/src/test/scala/org/apache/spark/streaming/kafka010/KafkaTestUtils.scala @@ -206,7 +206,7 @@ private[kafka010] class KafkaTestUtils extends Logging { /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) + sendMessages(topic, Map(messageToFreq.asScala.view.mapValues(_.intValue()).toSeq: _*)) } /** Send the messages to the Kafka broker */ diff --git a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index 50dfd50aa23..10d24df7b61 100644 --- a/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/connector/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -289,6 +289,6 @@ private[kinesis] class SimpleDataGenerator( sentSeqNumbers += ((num, seqNumber)) } - shardIdToSeqNumbers.mapValues(_.toSeq).toMap + shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap } } diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala index f95bee1d98f..2799965a8fc 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KPLBasedKinesisTestUtils.scala @@ -72,6 +72,6 @@ private[kinesis] class KPLDataGenerator(regionName: String) extends KinesisDataG Futures.addCallback(future, kinesisCallBack, ThreadUtils.sameThreadExecutorService()) } producer.flushSync() - shardIdToSeqNumbers.mapValues(_.toSeq).toMap + shardIdToSeqNumbers.view.mapValues(_.toSeq).toMap } } diff --git a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala index 4b3b7454b86..c9d1498a5a4 100644 --- a/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala +++ b/connector/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDDSuite.scala @@ -47,8 +47,8 @@ abstract class KinesisBackedBlockRDDTests(aggregateTestData: Boolean) require(shardIdToDataAndSeqNumbers.size > 1, "Need data to be sent to multiple shards") shardIds = shardIdToDataAndSeqNumbers.keySet.toSeq - shardIdToData = shardIdToDataAndSeqNumbers.mapValues(_.map(_._1)).toMap - shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.mapValues(_.map(_._2)).toMap + shardIdToData = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._1)).toMap + shardIdToSeqNumbers = shardIdToDataAndSeqNumbers.view.mapValues(_.map(_._2)).toMap shardIdToRange = shardIdToSeqNumbers.map { case (shardId, seqNumbers) => val seqNumRange = SequenceNumberRange( testUtils.streamName, shardId, seqNumbers.head, seqNumbers.last, seqNumbers.size) diff --git a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala index 2388668f66a..6c18a8863af 100644 --- a/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala +++ b/connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/ProtobufUtils.scala @@ -75,6 +75,7 @@ private[sql] object ProtobufUtils extends Logging { private[this] val protoFieldArray = descriptor.getFields.asScala.toArray private[this] val fieldMap = descriptor.getFields.asScala .groupBy(_.getName.toLowerCase(Locale.ROOT)) + .view .mapValues(_.toSeq) // toSeq needed for scala 2.13 /** The fields which have matching equivalents in both Protobuf and Catalyst schemas. */ diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 7d11e4b157a..aa8e1b1520e 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -147,7 +147,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKey( withReplacement, - fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize + fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize seed)) /** @@ -179,7 +179,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) seed: Long): JavaPairRDD[K, V] = new JavaPairRDD[K, V](rdd.sampleByKeyExact( withReplacement, - fractions.asScala.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize + fractions.asScala.view.mapValues(_.toDouble).toMap, // map to Scala Double; toMap to serialize seed)) /** diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 6ead36971a9..c016910ed76 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -782,7 +782,7 @@ class JavaSparkContext(val sc: SparkContext) extends Closeable { * @note This does not necessarily mean the caching or computation was successful. */ def getPersistentRDDs: JMap[java.lang.Integer, JavaRDD[_]] = { - sc.getPersistentRDDs.mapValues(s => JavaRDD.fromRDD(s)).toMap + sc.getPersistentRDDs.view.mapValues(s => JavaRDD.fromRDD(s)).toMap .asJava.asInstanceOf[JMap[java.lang.Integer, JavaRDD[_]]] } diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index 2c8f4f2ca2a..14265f03795 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -369,7 +369,7 @@ private[spark] class PythonWorkerFactory( daemon = null daemonPort = 0 } else { - simpleWorkers.mapValues(_.destroy()) + simpleWorkers.view.mapValues(_.destroy()) } } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index 4b3fd580341..e04c5b2de7d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -178,6 +178,6 @@ object InputFormatInfo { } } - nodeToSplit.mapValues(_.toSet).toMap + nodeToSplit.view.mapValues(_.toSet).toMap } } diff --git a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala index d00578e73e9..41f6b3ad64b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/TaskSchedulerImpl.scala @@ -155,7 +155,7 @@ private[spark] class TaskSchedulerImpl( .build[String, ExecutorDecommissionState]() def runningTasksByExecutors: Map[String, Int] = synchronized { - executorIdToRunningTaskIds.toMap.mapValues(_.size).toMap + executorIdToRunningTaskIds.toMap.view.mapValues(_.size).toMap } // The set of executors we have on each host; this is used to compute hostsAlive, which diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala index dd53757fe85..c49b2411e76 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala @@ -736,7 +736,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, val rpcEnv: Rp } def getExecutorsWithRegistrationTs(): Map[String, Long] = synchronized { - executorDataMap.mapValues(v => v.registrationTs).toMap + executorDataMap.view.mapValues(v => v.registrationTs).toMap } override def isExecutorActive(id: String): Boolean = synchronized { diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala index baedccd9b92..4ee864f00e8 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ApplicationEnvironmentInfoWrapperSerializer.scala @@ -130,9 +130,10 @@ private[protobuf] class ApplicationEnvironmentInfoWrapperSerializer new ResourceProfileInfo( id = info.getId, executorResources = - info.getExecutorResourcesMap.asScala.mapValues(deserializeExecutorResourceRequest).toMap, + info.getExecutorResourcesMap.asScala.view.mapValues(deserializeExecutorResourceRequest) + .toMap, taskResources = - info.getTaskResourcesMap.asScala.mapValues(deserializeTaskResourceRequest).toMap) + info.getTaskResourcesMap.asScala.view.mapValues(deserializeTaskResourceRequest).toMap) } private def deserializeExecutorResourceRequest(info: StoreTypes.ExecutorResourceRequest): diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala index 97daf37995d..188187c9b75 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/ExecutorSummaryWrapperSerializer.scala @@ -137,7 +137,8 @@ private[protobuf] class ExecutorSummaryWrapperSerializer blacklistedInStages = binary.getBlacklistedInStagesList.asScala.map(_.toInt).toSet, peakMemoryMetrics = peakMemoryMetrics, attributes = binary.getAttributesMap.asScala.toMap, - resources = binary.getResourcesMap.asScala.mapValues(deserializeResourceInformation).toMap, + resources = + binary.getResourcesMap.asScala.view.mapValues(deserializeResourceInformation).toMap, resourceProfileId = binary.getResourceProfileId, isExcluded = binary.getIsExcluded, excludedInStages = binary.getExcludedInStagesList.asScala.map(_.toInt).toSet) diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala index 41d1fee1608..5252b8b8c01 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/JobDataWrapperSerializer.scala @@ -107,6 +107,6 @@ private[protobuf] class JobDataWrapperSerializer extends ProtobufSerDe[JobDataWr numCompletedStages = info.getNumCompletedStages, numSkippedStages = info.getNumSkippedStages, numFailedStages = info.getNumFailedStages, - killedTasksSummary = info.getKillTasksSummaryMap.asScala.mapValues(_.toInt).toMap) + killedTasksSummary = info.getKillTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap) } } diff --git a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala index 8793ca7a12c..4fbaff0327d 100644 --- a/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala +++ b/core/src/main/scala/org/apache/spark/status/protobuf/StageDataWrapperSerializer.scala @@ -382,7 +382,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa new StageDataWrapper( info = info, jobIds = binary.getJobIdsList.asScala.map(_.toInt).toSet, - locality = binary.getLocalityMap.asScala.mapValues(_.toLong).toMap + locality = binary.getLocalityMap.asScala.view.mapValues(_.toLong).toMap ) } @@ -402,7 +402,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa entry => (entry._1.toLong, deserializeTaskData(entry._2))).toMap) } else None val executorSummary = if (MapUtils.isNotEmpty(binary.getExecutorSummaryMap)) { - Some(binary.getExecutorSummaryMap.asScala.mapValues( + Some(binary.getExecutorSummaryMap.asScala.view.mapValues( ExecutorStageSummarySerializer.deserialize).toMap) } else None val speculationSummary = @@ -475,7 +475,7 @@ private[protobuf] class StageDataWrapperSerializer extends ProtobufSerDe[StageDa tasks = tasks, executorSummary = executorSummary, speculationSummary = speculationSummary, - killedTasksSummary = binary.getKilledTasksSummaryMap.asScala.mapValues(_.toInt).toMap, + killedTasksSummary = binary.getKilledTasksSummaryMap.asScala.view.mapValues(_.toInt).toMap, resourceProfileId = binary.getResourceProfileId, peakExecutorMetrics = peakExecutorMetrics, taskMetricsDistributions = taskMetricsDistributions, diff --git a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala index fb82714949b..9f32d81f1ae 100644 --- a/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkThrowableSuite.scala @@ -65,7 +65,7 @@ class SparkThrowableSuite extends SparkFunSuite { } def checkIfUnique(ss: Seq[Any]): Unit = { - val dups = ss.groupBy(identity).mapValues(_.size).filter(_._2 > 1).keys.toSeq + val dups = ss.groupBy(identity).view.mapValues(_.size).filter(_._2 > 1).keys.toSeq assert(dups.isEmpty, s"Duplicate error classes: ${dups.mkString(", ")}") } diff --git a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala index a669993352f..c30b4ca4dae 100644 --- a/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala @@ -805,7 +805,7 @@ class PairRDDFunctionsSuite extends SparkFunSuite with SharedSparkContext { seed: Long, n: Long): Unit = { val trials = stratifiedData.countByKey() - val expectedSampleSize = stratifiedData.countByKey().mapValues(count => + val expectedSampleSize = stratifiedData.countByKey().view.mapValues(count => math.ceil(count * samplingRate).toInt) val fractions = Map("1" -> samplingRate, "0" -> samplingRate) val sample = if (exact) { diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala index 046017af3a3..f925a8b8b71 100644 --- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala +++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala @@ -1254,6 +1254,7 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually { .getPartitions .map(coalescedRDD.getPreferredLocations(_).head) .groupBy(identity) + .view .mapValues(_.size) // Make sure the coalesced partitions are distributed fairly evenly between the two locations. diff --git a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala index e392ff53e02..3f99e2b4598 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/ExecutorResourceInfoSuite.scala @@ -101,6 +101,7 @@ class ExecutorResourceInfoSuite extends SparkFunSuite { // assert that each address was assigned `slots` times info.assignedAddrs .groupBy(identity) + .view .mapValues(_.size) .foreach(x => assert(x._2 == slots)) diff --git a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala index 5ab9f644be6..59ebc750af9 100644 --- a/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/BlockManagerDecommissionIntegrationSuite.scala @@ -299,7 +299,7 @@ class BlockManagerDecommissionIntegrationSuite extends SparkFunSuite with LocalS val blockLocs = rddUpdates.map { update => (update.blockUpdatedInfo.blockId.name, update.blockUpdatedInfo.blockManagerId)} - val blocksToManagers = blockLocs.groupBy(_._1).mapValues(_.size) + val blocksToManagers = blockLocs.groupBy(_._1).view.mapValues(_.size) assert(blocksToManagers.exists(_._2 > 1), s"We should have a block that has been on multiple BMs in rdds:\n ${rddUpdates} from:\n" + s"${blocksUpdated}\n but instead we got:\n ${blocksToManagers}") diff --git a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala index c16dae77b83..769939a0a22 100644 --- a/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala +++ b/core/src/test/scala/org/apache/spark/storage/ShuffleBlockFetcherIteratorSuite.scala @@ -988,7 +988,7 @@ class ShuffleBlockFetcherIteratorSuite extends SparkFunSuite with PrivateMethodT ShuffleBlockId(0, 1, 0) -> createMockManagedBuffer() ) - configureMockTransfer(blocks.mapValues(_ => createMockManagedBuffer(0)).toMap) + configureMockTransfer(blocks.view.mapValues(_ => createMockManagedBuffer(0)).toMap) val iterator = createShuffleBlockIteratorWithDefaults( Map(remoteBmId -> toBlockList(blocks.keys, 1L, 0)) diff --git a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala index 7bec9612187..02cc2bb35af 100644 --- a/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala +++ b/core/src/test/scala/org/apache/spark/util/collection/ExternalSorterSuite.scala @@ -544,7 +544,7 @@ class ExternalSorterSuite extends SparkFunSuite with LocalSparkContext { val expected = (0 until 3).map { p => var v = (0 until size).map { i => (i / 4, i) }.filter { case (k, _) => k % 3 == p }.toSet if (withPartialAgg) { - v = v.groupBy(_._1).mapValues { s => s.map(_._2).sum }.toSet + v = v.groupBy(_._1).view.mapValues { s => s.map(_._2).sum }.toSet } (p, v.toSet) }.toSet diff --git a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala index 5dfb8ec898c..4cad0f16426 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DFSReadWriteTest.scala @@ -89,6 +89,7 @@ object DFSReadWriteTest { .flatMap(_.split("\t")) .filter(_.nonEmpty) .groupBy(w => w) + .view .mapValues(_.size) .values .sum diff --git a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala index c003dc8ba38..9095c9b75af 100644 --- a/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/MiniReadWriteTest.scala @@ -81,6 +81,7 @@ object MiniReadWriteTest { .flatMap(_.split("\t")) .filter(_.nonEmpty) .groupBy(w => w) + .view .mapValues(_.size) .values .sum diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala index 2a77702e223..d26c9a3a056 100644 --- a/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/PowerIterationClusteringExample.scala @@ -101,7 +101,7 @@ object PowerIterationClusteringExample { .setInitializationMode("degree") .run(circlesRdd) - val clusters = model.assignments.collect().groupBy(_.cluster).mapValues(_.map(_.id)) + val clusters = model.assignments.collect().groupBy(_.cluster).view.mapValues(_.map(_.id)) val assignments = clusters.toList.sortBy { case (k, v) => v.length } val assignmentsStr = assignments .map { case (k, v) => diff --git a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala index 527187d51ae..1f158808215 100644 --- a/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala +++ b/graphx/src/test/scala/org/apache/spark/graphx/lib/ShortestPathsSuite.scala @@ -34,7 +34,7 @@ class ShortestPathsSuite extends SparkFunSuite with LocalSparkContext { val graph = Graph.fromEdgeTuples(edges, 1) val landmarks = Seq(1, 4).map(_.toLong) val results = ShortestPaths.run(graph, landmarks).vertices.collect().map { - case (v, spMap) => (v, spMap.mapValues(i => i).toMap) + case (v, spMap) => (v, spMap.view.mapValues(i => i).toMap) } assert(results.toSet === shortestPaths) } diff --git a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala index b8563bed601..ee87f49806a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/evaluation/ClusteringMetrics.scala @@ -331,6 +331,7 @@ private[evaluation] object SquaredEuclideanSilhouette extends Silhouette { clustersStatsRDD .collectAsMap() + .view .mapValues { case (featureSum: DenseVector, squaredNormSum: Double, weightSum: Double) => SquaredEuclideanSilhouette.ClusterStats(featureSum, squaredNormSum, weightSum) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala index cf1751f86f9..ff997194b42 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/VectorIndexer.scala @@ -300,7 +300,7 @@ class VectorIndexerModel private[ml] ( /** Java-friendly version of [[categoryMaps]] */ @Since("1.4.0") def javaCategoryMaps: JMap[JInt, JMap[JDouble, JInt]] = { - categoryMaps.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]] + categoryMaps.view.mapValues(_.asJava).toMap.asJava.asInstanceOf[JMap[JInt, JMap[JDouble, JInt]]] } /** diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 6a22c3580e3..053aaac742b 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -217,7 +217,7 @@ class Word2VecModel private[ml] ( @Since("1.5.0") @transient lazy val getVectors: DataFrame = { val spark = SparkSession.builder().getOrCreate() - val wordVec = wordVectors.getVectors.mapValues(vec => Vectors.dense(vec.map(_.toDouble))) + val wordVec = wordVectors.getVectors.view.mapValues(vec => Vectors.dense(vec.map(_.toDouble))) spark.createDataFrame(wordVec.toSeq).toDF("word", "vector") } diff --git a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala index 012e942a60a..4e5b8cd5efe 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/tree/impl/RandomForest.scala @@ -1292,8 +1292,8 @@ private[spark] object RandomForest extends Logging with Serializable { } // Convert mutable maps to immutable ones. val nodesForGroup: Map[Int, Array[LearningNode]] = - mutableNodesForGroup.mapValues(_.toArray).toMap - val treeToNodeToIndexInfo = mutableTreeToNodeToIndexInfo.mapValues(_.toMap).toMap + mutableNodesForGroup.view.mapValues(_.toArray).toMap + val treeToNodeToIndexInfo = mutableTreeToNodeToIndexInfo.view.mapValues(_.toMap).toMap (nodesForGroup, treeToNodeToIndexInfo) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala index a53581e8a60..fce2537b761 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/clustering/BisectingKMeans.scala @@ -220,7 +220,7 @@ class BisectingKMeans private ( divisibleIndices.contains(parentIndex(index)) } newClusters = summarize(d, newAssignments, dMeasure) - newClusterCenters = newClusters.mapValues(_.center).map(identity).toMap + newClusterCenters = newClusters.view.mapValues(_.center).map(identity).toMap } if (preIndices != null) { preIndices.unpersist() diff --git a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala index 8ab6aba641a..af0d9e48a3b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/linalg/distributed/BlockMatrix.scala @@ -443,7 +443,7 @@ class BlockMatrix @Since("1.3.0") ( val leftMatrix = blockInfo.keys.collect() val rightMatrix = other.blockInfo.keys.collect() - val rightCounterpartsHelper = rightMatrix.groupBy(_._1).mapValues(_.map(_._2)) + val rightCounterpartsHelper = rightMatrix.groupBy(_._1).view.mapValues(_.map(_._2)) val leftDestinations = leftMatrix.map { case (rowIndex, colIndex) => val rightCounterparts = rightCounterpartsHelper.getOrElse(colIndex, Array.emptyIntArray) val partitions = rightCounterparts.map(b => partitioner.getPartition((rowIndex, b))) @@ -452,7 +452,7 @@ class BlockMatrix @Since("1.3.0") ( partitions.toSet.map((pid: Int) => pid * midDimSplitNum + midDimSplitIndex)) }.toMap - val leftCounterpartsHelper = leftMatrix.groupBy(_._2).mapValues(_.map(_._1)) + val leftCounterpartsHelper = leftMatrix.groupBy(_._2).view.mapValues(_.map(_._1)) val rightDestinations = rightMatrix.map { case (rowIndex, colIndex) => val leftCounterparts = leftCounterpartsHelper.getOrElse(rowIndex, Array.emptyIntArray) val partitions = leftCounterparts.map(b => partitioner.getPartition((b, colIndex))) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala index 9c761824134..ead9f887fe8 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/stat/test/ChiSqTest.scala @@ -162,6 +162,7 @@ private[spark] object ChiSqTest extends Logging { .map { case ((label, _), c) => (label, c) } .toArray .groupBy(_._1) + .view .mapValues(_.map(_._2).sum) labelCounts.foreach { case (label, countByLabel) => val nnzByLabel = labelNNZ.getOrElse(label, 0L) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index d206b5fd280..08202c7f1f3 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -855,7 +855,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt - val expectedUpToN = expected.mapValues(_.slice(0, n)) + val expectedUpToN = expected.view.mapValues(_.slice(0, n)) val topItems = model.recommendForAllUsers(k) assert(topItems.count() == numUsers) assert(topItems.columns.contains("user")) @@ -876,7 +876,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt - val expectedUpToN = expected.mapValues(_.slice(0, n)) + val expectedUpToN = expected.view.mapValues(_.slice(0, n)) val topUsers = getALSModel.recommendForAllItems(k) assert(topUsers.count() == numItems) assert(topUsers.columns.contains("item")) @@ -898,7 +898,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 4, 6).foreach { k => val n = math.min(k, numItems).toInt - val expectedUpToN = expected.mapValues(_.slice(0, n)) + val expectedUpToN = expected.view.mapValues(_.slice(0, n)) val topItems = model.recommendForUserSubset(userSubset, k) assert(topItems.count() == numUsersSubset) assert(topItems.columns.contains("user")) @@ -920,7 +920,7 @@ class ALSSuite extends MLTest with DefaultReadWriteTest with Logging { Seq(2, 3, 4).foreach { k => val n = math.min(k, numUsers).toInt - val expectedUpToN = expected.mapValues(_.slice(0, n)) + val expectedUpToN = expected.view.mapValues(_.slice(0, n)) val topUsers = model.recommendForItemSubset(itemSubset, k) assert(topUsers.count() == numItemsSubset) assert(topUsers.columns.contains("item")) diff --git a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala index e4cd492be3d..1973f306441 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/feature/Word2VecSuite.scala @@ -43,8 +43,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { // and a Word2VecMap give the same values. val word2VecMap = model.getVectors val newModel = new Word2VecModel(word2VecMap) - assert(newModel.getVectors.mapValues(_.toSeq).toMap === - word2VecMap.mapValues(_.toSeq).toMap) + assert(newModel.getVectors.view.mapValues(_.toSeq).toMap === + word2VecMap.view.mapValues(_.toSeq).toMap) } test("Word2Vec throws exception when vocabulary is empty") { @@ -103,8 +103,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { try { model.save(sc, path) val sameModel = Word2VecModel.load(sc, path) - assert(sameModel.getVectors.mapValues(_.toSeq).toMap === - model.getVectors.mapValues(_.toSeq).toMap) + assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap === + model.getVectors.view.mapValues(_.toSeq).toMap) } finally { Utils.deleteRecursively(tempDir) } @@ -138,8 +138,8 @@ class Word2VecSuite extends SparkFunSuite with MLlibTestSparkContext { try { model.save(sc, path) val sameModel = Word2VecModel.load(sc, path) - assert(sameModel.getVectors.mapValues(_.toSeq).toMap === - model.getVectors.mapValues(_.toSeq).toMap) + assert(sameModel.getVectors.view.mapValues(_.toSeq).toMap === + model.getVectors.view.mapValues(_.toSeq).toMap) } catch { case t: Throwable => fail("exception thrown persisting a model " + diff --git a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala index 3677927b9a5..dc5e49fc35b 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/types/Metadata.scala @@ -210,7 +210,7 @@ object Metadata { // `map.mapValues` return `Map` in Scala 2.12 and return `MapView` in Scala 2.13, call // `toMap` for Scala version compatibility. case map: Map[_, _] => - map.mapValues(hash).toMap.## + map.view.mapValues(hash).toMap.## case arr: Array[_] => // Seq.empty[T] has the same hashCode regardless of T. arr.toSeq.map(hash).## diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala index 65338f9917b..3a5f60eb376 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala @@ -1314,7 +1314,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor val partCols = partitionColumnNames(r.table) validatePartitionSpec(partCols, i.partitionSpec) - val staticPartitions = i.partitionSpec.filter(_._2.isDefined).mapValues(_.get).toMap + val staticPartitions = i.partitionSpec.filter(_._2.isDefined).view.mapValues(_.get).toMap val query = addStaticPartitionColumns(r, projectByName.getOrElse(i.query), staticPartitions, isByName) @@ -3616,6 +3616,7 @@ class Analyzer(override val catalogManager: CatalogManager) extends RuleExecutor // `GetStructField` without the name property. .collect { case g: GetStructField if g.name.isEmpty => g } .groupBy(_.child) + .view .mapValues(_.map(_.ordinal).distinct.sorted) structChildToOrdinals.foreach { case (expr, ordinals) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala index 67c57ec2787..0f235e0977b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/ExternalCatalogUtils.scala @@ -207,7 +207,7 @@ object ExternalCatalogUtils { } def convertNullPartitionValues(spec: TablePartitionSpec): TablePartitionSpec = { - spec.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).map(identity).toMap + spec.view.mapValues(v => if (v == null) DEFAULT_PARTITION_NAME else v).map(identity).toMap } } diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala index f48ff23f4ad..e71865df94d 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala @@ -968,7 +968,7 @@ class SessionCatalog( } else { _.toLowerCase(Locale.ROOT) } - val nameToCounts = viewColumnNames.groupBy(normalizeColName).mapValues(_.length) + val nameToCounts = viewColumnNames.groupBy(normalizeColName).view.mapValues(_.length) val nameToCurrentOrdinal = scala.collection.mutable.HashMap.empty[String, Int] val viewDDL = buildViewDDL(metadata, isTempView) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala index b32ef3d95aa..ea985ae5f30 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/package.scala @@ -126,7 +126,7 @@ package object expressions { } private def unique[T](m: Map[T, Seq[Attribute]]): Map[T, Seq[Attribute]] = { - m.mapValues(_.distinct).toMap + m.view.mapValues(_.distinct).toMap } /** Map to use for direct case insensitive attribute lookups. */ diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala index 5d4fcf772b8..710c07fab7e 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/NestedColumnAliasing.scala @@ -150,7 +150,7 @@ object NestedColumnAliasing { // A reference attribute can have multiple aliases for nested fields. val attrToAliases = - AttributeMap(attributeToExtractValuesAndAliases.mapValues(_.map(_._2))) + AttributeMap(attributeToExtractValuesAndAliases.view.mapValues(_.map(_._2))) plan match { case Project(projectList, child) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index 6f65afada17..48ecb9aee21 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -1055,6 +1055,7 @@ object CollapseProject extends Rule[LogicalPlan] with AliasHelper { .filter(_.references.exists(producerMap.contains)) .flatMap(collectReferences) .groupBy(identity) + .view .mapValues(_.size) .forall { case (reference, count) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala index 2b5f542f22b..d9d04db9ab0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/parser/AstBuilder.scala @@ -1083,7 +1083,7 @@ class AstBuilder extends DataTypeAstBuilder with SQLConfHelper with Logging { // window w1 as (partition by p_mfgr order by p_name // range between 2 preceding and 2 following), // w2 as w1 - val windowMapView = baseWindowMap.mapValues { + val windowMapView = baseWindowMap.view.mapValues { case WindowSpecReference(name) => baseWindowMap.get(name) match { case Some(spec: WindowSpecDefinition) => diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala index e7be8a7e29b..0e5dd301953 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala @@ -365,7 +365,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] // `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala // 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13 // `mapValues` is lazy and we need to force it to materialize - m.mapValues(mapChild).view.force.toMap + m.view.mapValues(mapChild).view.force.toMap case arg: TreeNode[_] if containsChild(arg) => mapTreeNode(arg) case Some(child) => Some(mapChild(child)) case nonChild: AnyRef => nonChild @@ -786,7 +786,7 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]] Some(arg.asInstanceOf[BaseType].clone()) // `map.mapValues().view.force` return `Map` in Scala 2.12 but return `IndexedSeq` in Scala // 2.13, call `toMap` method manually to compatible with Scala 2.12 and Scala 2.13 - case m: Map[_, _] => m.mapValues { + case m: Map[_, _] => m.view.mapValues { case arg: TreeNode[_] if containsChild(arg) => arg.asInstanceOf[BaseType].clone() case other => other diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala index d56bca30a05..1274751ac94 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/util/ToNumberParser.scala @@ -334,7 +334,7 @@ class ToNumberParser(numberFormat: String, errorOnFail: Boolean) extends Seriali ) } // Make sure that the format string does not contain any prohibited duplicate tokens. - val inputTokenCounts = formatTokens.groupBy(identity).mapValues(_.size) + val inputTokenCounts = formatTokens.groupBy(identity).view.mapValues(_.size) Seq(DecimalPoint(), OptionalPlusOrMinusSign(), OptionalMinusSign(), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala index ae5ebd6a974..5fcd71d8bf9 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryCompilationErrors.scala @@ -228,6 +228,7 @@ private[sql] object QueryCompilationErrors extends QueryErrorsBase with Compilat case seq => Some(CreateStruct(seq)).map(e => Alias(e, e.sql)()).get } .groupBy(_.dataType) + .view .mapValues(values => values.map(value => toSQLId(value.name)).sorted) .mapValues(values => if (values.length > 3) values.take(3) :+ "..." else values) .toList.sortBy(_._1.sql) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala index aa3fe8dfb7c..59aa17baa7f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala @@ -683,7 +683,7 @@ class SparkSession private( val plan = tracker.measurePhase(QueryPlanningTracker.PARSING) { val parsedPlan = sessionState.sqlParser.parsePlan(sqlText) if (args.nonEmpty) { - NameParameterizedQuery(parsedPlan, args.mapValues(lit(_).expr).toMap) + NameParameterizedQuery(parsedPlan, args.view.mapValues(lit(_).expr).toMap) } else { parsedPlan } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala index dc0857383e7..c2b227d6cad 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala @@ -64,7 +64,7 @@ case class AnalyzePartitionCommand( tableId.table, tableId.database.get, schemaColumns, specColumns) } - val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get) + val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).view.mapValues(_.get) if (filteredSpec.isEmpty) { None } else { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala index 8011cc00743..9b38155851e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/PartitioningUtils.scala @@ -406,7 +406,7 @@ object PartitioningUtils extends SQLConfHelper { val distinctPartColNames = pathWithPartitionValues.map(_._2.columnNames).distinct def groupByKey[K, V](seq: Seq[(K, V)]): Map[K, Iterable[V]] = - seq.groupBy { case (key, _) => key }.mapValues(_.map { case (_, value) => value }).toMap + seq.groupBy { case (key, _) => key }.view.mapValues(_.map { case (_, value) => value }).toMap val partColNamesToPaths = groupByKey(pathWithPartitionValues.map { case (path, partValues) => partValues.columnNames -> path diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala index b7de20ae293..e5444094673 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/orc/OrcFiltersBase.scala @@ -81,6 +81,7 @@ trait OrcFiltersBase { val dedupPrimitiveFields = primitiveFields .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) + .view .mapValues(_.head._2) CaseInsensitiveMap(dedupPrimitiveFields.toMap) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala index c5360c9a04f..c1d02ba5a22 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFilters.scala @@ -110,6 +110,7 @@ class ParquetFilters( primitiveFields .groupBy(_._1.toLowerCase(Locale.ROOT)) .filter(_._2.size == 1) + .view .mapValues(_.head._2) CaseInsensitiveMap(dedupPrimitiveFields.toMap) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala index 8552c950f67..bddf6d02f1f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala @@ -491,6 +491,7 @@ case class EnsureRequirements( val numExpectedPartitions = partValues .map(InternalRowComparableWrapper(_, partitionExprs)) .groupBy(identity) + .view .mapValues(_.size) mergedPartValues = mergedPartValues.map { case (partVal, numParts) => diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala index 702d3ea09b6..e70e94001ee 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ProgressReporter.scala @@ -136,9 +136,9 @@ trait ProgressReporter extends Logging { from: StreamProgress, to: StreamProgress, latest: StreamProgress): Unit = { - currentTriggerStartOffsets = from.mapValues(_.json).toMap - currentTriggerEndOffsets = to.mapValues(_.json).toMap - currentTriggerLatestOffsets = latest.mapValues(_.json).toMap + currentTriggerStartOffsets = from.view.mapValues(_.json).toMap + currentTriggerEndOffsets = to.view.mapValues(_.json).toMap + currentTriggerLatestOffsets = latest.view.mapValues(_.json).toMap latestStreamProgress = to } @@ -243,7 +243,7 @@ trait ProgressReporter extends Logging { batchId = currentBatchId, batchDuration = processingTimeMills, durationMs = - new java.util.HashMap(currentDurationsMs.toMap.mapValues(long2Long).toMap.asJava), + new java.util.HashMap(currentDurationsMs.toMap.view.mapValues(long2Long).toMap.asJava), eventTime = new java.util.HashMap(executionStats.eventTimeStats.asJava), stateOperators = executionStats.stateOperators.toArray, sources = sourceProgress.toArray, @@ -297,7 +297,7 @@ trait ProgressReporter extends Logging { Map( "max" -> stats.max, "min" -> stats.min, - "avg" -> stats.avg.toLong).mapValues(formatTimestamp) + "avg" -> stats.avg.toLong).view.mapValues(formatTimestamp) }.headOption.getOrElse(Map.empty) ++ watermarkTimestamp ExecutionStats(numInputRows, stateOperators, eventTimeStats.toMap) @@ -307,7 +307,7 @@ trait ProgressReporter extends Logging { private def extractSourceToNumInputRows(): Map[SparkDataStream, Long] = { def sumRows(tuples: Seq[(SparkDataStream, Long)]): Map[SparkDataStream, Long] = { - tuples.groupBy(_._1).mapValues(_.map(_._2).sum).toMap // sum up rows for each source + tuples.groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // sum up rows for each source } def unrollCTE(plan: LogicalPlan): LogicalPlan = { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index 40f63c86a6a..3dfc27b14c2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -507,7 +507,7 @@ class RocksDB( "put" -> DB_WRITE, "compaction" -> COMPACTION_TIME ).toMap - val nativeOpsLatencyMicros = nativeOpsHistograms.mapValues { typ => + val nativeOpsLatencyMicros = nativeOpsHistograms.view.mapValues { typ => RocksDBNativeHistogram(nativeStats.getHistogramData(typ)) } val nativeOpsMetricTickers = Seq( @@ -530,7 +530,7 @@ class RocksDB( /** Number of bytes written during flush */ "totalBytesWrittenByFlush" -> FLUSH_WRITE_BYTES ).toMap - val nativeOpsMetrics = nativeOpsMetricTickers.mapValues { typ => + val nativeOpsMetrics = nativeOpsMetricTickers.view.mapValues { typ => nativeStats.getTickerCount(typ) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala index def4a4104f7..b36aa264e28 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/statefulOperators.scala @@ -156,7 +156,7 @@ trait StateStoreWriter extends StatefulOperator with PythonSQLMetrics { self: Sp .map(entry => entry._1 -> longMetric(entry._1).value) val javaConvertedCustomMetrics: java.util.HashMap[String, java.lang.Long] = - new java.util.HashMap(customMetrics.mapValues(long2Long).toMap.asJava) + new java.util.HashMap(customMetrics.view.mapValues(long2Long).toMap.asJava) // We now don't report number of shuffle partitions inside the state operator. Instead, // it will be filled when the stream query progress is reported diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala index 2e088ec8e4b..f66b99a154e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ui/AllExecutionsPage.scala @@ -79,7 +79,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "running", running.toSeq, - executionIdToSubExecutions.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, currentTime, showErrorMessage = false, showRunningJobs = true, @@ -105,7 +105,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "completed", completed.toSeq, - executionIdToSubExecutions.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, currentTime, showErrorMessage = false, showRunningJobs = false, @@ -132,7 +132,7 @@ private[ui] class AllExecutionsPage(parent: SQLTab) extends WebUIPage("") with L request, "failed", failed.toSeq, - executionIdToSubExecutions.mapValues(_.toSeq).toMap, + executionIdToSubExecutions.view.mapValues(_.toSeq).toMap, currentTime, showErrorMessage = true, showRunningJobs = false, diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala index 47ff942e5ca..1dece5c8285 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameStatSuite.scala @@ -343,7 +343,7 @@ class DataFrameStatSuite extends QueryTest with SharedSparkSession { val columnNames = crosstab.schema.fieldNames assert(columnNames(0) === "a_b") // reduce by key - val expected = data.map(t => (t, 1)).groupBy(_._1).mapValues(_.length) + val expected = data.map(t => (t, 1)).groupBy(_._1).view.mapValues(_.length) val rows = crosstab.collect() rows.foreach { row => val i = row.getString(0).toInt diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala index 226d5098d42..d6dc5165fbb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala @@ -425,7 +425,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSparkSession with SQLHelper // We need to do cartesian product for all the config dimensions, to get a list of // config sets, and run the query once for each config set. val configDimLines = comments.filter(_.startsWith("--CONFIG_DIM")).map(_.substring(12)) - val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).mapValues { lines => + val configDims = configDimLines.groupBy(_.takeWhile(_ != ' ')).view.mapValues { lines => lines.map(_.dropWhile(_ != ' ').substring(1)).map(_.split(",").map { kv => val (conf, value) = kv.span(_ != '=') conf.trim -> value.substring(1).trim diff --git a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala index 814cf2f33ff..60d82fd1ac3 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/UDFSuite.scala @@ -527,7 +527,7 @@ class UDFSuite extends QueryTest with SharedSparkSession { sparkContext.parallelize(Seq(Row(Map("a" -> new BigDecimal("2011000000000002456556"))))), StructType(Seq(StructField("col1", MapType(StringType, DecimalType(30, 0)))))) val udf2 = org.apache.spark.sql.functions.udf((map: Map[String, BigDecimal]) => { - map.mapValues(value => if (value == null) null else value.toBigInteger.toString).toMap + map.view.mapValues(value => if (value == null) null else value.toBigInteger.toString).toMap }) checkAnswer(df2.select(udf2($"col1")), Seq(Row(Map("a" -> "2011000000000002456556")))) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala index 6563a7698e2..057cb527cf0 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SQLViewTestSuite.scala @@ -629,7 +629,7 @@ class PersistedViewTestSuite extends SQLViewTestSuite with SharedSparkSession { val meta = catalog.getTableRawMetadata(TableIdentifier("test_view", Some("default"))) // simulate a view meta with incompatible schema change val newProp = meta.properties - .mapValues(_.replace("col_i", "col_j")).toMap + .view.mapValues(_.replace("col_i", "col_j")).toMap val newSchema = StructType(Seq(StructField("col_j", IntegerType))) catalog.alterTable(meta.copy(properties = newProp, schema = newSchema)) val e = intercept[AnalysisException] { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala index eaf305414f1..1e786c8e578 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/AlterTableDropPartitionSuiteBase.scala @@ -47,7 +47,7 @@ trait AlterTableDropPartitionSuiteBase extends QueryTest with DDLCommandTestUtil t: String, ifExists: String, specs: Map[String, Any]*): Unit = { - checkPartitions(t, specs.map(_.mapValues(_.toString).toMap): _*) + checkPartitions(t, specs.map(_.view.mapValues(_.toString).toMap): _*) val specStr = specs.map(partSpecToString).mkString(", ") sql(s"ALTER TABLE $t DROP $ifExists $specStr") checkPartitions(t) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index a5d5f8ce30f..726ae87b5ce 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -392,7 +392,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSparkSession withParquetDataFrame(data) { df => // Structs are converted to `Row`s checkAnswer(df, data.map { case Tuple1(m) => - Row(m.mapValues(struct => Row(struct.productIterator.toSeq: _*))) + Row(m.view.mapValues(struct => Row(struct.productIterator.toSeq: _*))) }) } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala index cae008af6f0..30315c12b58 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/metric/SQLMetricsTestUtils.scala @@ -216,8 +216,8 @@ trait SQLMetricsTestUtils extends SQLTestUtils { expectedNumOfJobs: Int, expectedMetrics: Map[Long, (String, Map[String, Any])], enableWholeStage: Boolean = false): Unit = { - val expectedMetricsPredicates = expectedMetrics.mapValues { case (nodeName, nodeMetrics) => - (nodeName, nodeMetrics.mapValues(expectedMetricValue => + val expectedMetricsPredicates = expectedMetrics.view.mapValues { case (nodeName, nodeMetrics) => + (nodeName, nodeMetrics.view.mapValues(expectedMetricValue => (actualMetricValue: Any) => { actualMetricValue.toString.matches(expectedMetricValue.toString) }).toMap) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala index 9a3313b8069..5affc9ef3b2 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ui/SQLAppStatusListenerSuite.scala @@ -220,23 +220,23 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 2).toMap) + accumulatorUpdates.view.mapValues(_ * 2).toMap) // Driver accumulator updates don't belong to this execution should be filtered and no // exception will be thrown. listener.onOtherEvent(SparkListenerDriverAccumUpdates(0, Seq((999L, 2L)))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 2).toMap) + accumulatorUpdates.view.mapValues(_ * 2).toMap) listener.onExecutorMetricsUpdate(SparkListenerExecutorMetricsUpdate("", Seq( // (task id, stage id, stage attempt, accum updates) (0L, 0, 0, createAccumulatorInfos(accumulatorUpdates)), - (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.mapValues(_ * 2).toMap)) + (1L, 0, 0, createAccumulatorInfos(accumulatorUpdates.view.mapValues(_ * 2).toMap)) ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 3).toMap) + accumulatorUpdates.view.mapValues(_ * 3).toMap) // Retrying a stage should reset the metrics listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(0, 1))) @@ -250,7 +250,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 2).toMap) + accumulatorUpdates.view.mapValues(_ * 2).toMap) // Ignore the task end for the first attempt listener.onTaskEnd(SparkListenerTaskEnd( @@ -258,12 +258,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 100).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 100).toMap), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 2).toMap) + accumulatorUpdates.view.mapValues(_ * 2).toMap) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -271,7 +271,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 2).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 2).toMap), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -279,12 +279,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 1, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), + createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 5).toMap) + accumulatorUpdates.view.mapValues(_ * 5).toMap) // Summit a new stage listener.onStageSubmitted(SparkListenerStageSubmitted(createStageInfo(1, 0))) @@ -298,7 +298,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes ))) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 7).toMap) + accumulatorUpdates.view.mapValues(_ * 7).toMap) // Finish two tasks listener.onTaskEnd(SparkListenerTaskEnd( @@ -306,7 +306,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(0, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), + createTaskInfo(0, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) listener.onTaskEnd(SparkListenerTaskEnd( @@ -314,12 +314,12 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes stageAttemptId = 0, taskType = "", reason = null, - createTaskInfo(1, 0, accums = accumulatorUpdates.mapValues(_ * 3).toMap), + createTaskInfo(1, 0, accums = accumulatorUpdates.view.mapValues(_ * 3).toMap), new ExecutorMetrics, null)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 11).toMap) + accumulatorUpdates.view.mapValues(_ * 11).toMap) assertJobs(statusStore.execution(executionId), running = Seq(0)) @@ -334,7 +334,7 @@ abstract class SQLAppStatusListenerSuite extends SharedSparkSession with JsonTes assertJobs(statusStore.execution(executionId), completed = Seq(0)) checkAnswer(statusStore.executionMetrics(executionId), - accumulatorUpdates.mapValues(_ * 11).toMap) + accumulatorUpdates.view.mapValues(_ * 11).toMap) } test("control a plan explain mode in listeners via SQLConf") { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala index 4fa49064faa..4c478486c6b 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryStatusAndProgressSuite.scala @@ -391,7 +391,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).toMap.asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).view.mapValues(long2Long).toMap.asJava), eventTime = new java.util.HashMap(Map( "max" -> "2016-12-05T20:54:20.827Z", "min" -> "2016-12-05T20:54:20.827Z", @@ -403,7 +403,7 @@ object StreamingQueryStatusAndProgressSuite { numShufflePartitions = 2, numStateStoreInstances = 2, customMetrics = new java.util.HashMap(Map("stateOnCurrentVersionSizeBytes" -> 2L, "loadedMapCacheHitCount" -> 1L, "loadedMapCacheMissCount" -> 0L) - .mapValues(long2Long).toMap.asJava) + .view.mapValues(long2Long).toMap.asJava) )), sources = Array( new SourceProgress( @@ -429,7 +429,7 @@ object StreamingQueryStatusAndProgressSuite { timestamp = "2016-12-05T20:54:20.827Z", batchId = 2L, batchDuration = 0L, - durationMs = new java.util.HashMap(Map("total" -> 0L).mapValues(long2Long).toMap.asJava), + durationMs = new java.util.HashMap(Map("total" -> 0L).view.mapValues(long2Long).toMap.asJava), // empty maps should be handled correctly eventTime = new java.util.HashMap(Map.empty[String, String].asJava), stateOperators = Array(new StateOperatorProgress(operatorName = "op2", diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala index 44de7af0b4a..e5e873cca12 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/continuous/ContinuousSuite.scala @@ -61,7 +61,7 @@ class ContinuousSuiteBase extends StreamTest { c.committedOffsets.lastOption.map { case (_, offset) => offset match { case o: RateStreamOffset => - o.partitionToValueAndRunTimeMs.mapValues(_.value).toMap + o.partitionToValueAndRunTimeMs.view.mapValues(_.value).toMap } } } diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala index 5c340522f91..f52ff8cf9a1 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala @@ -1261,7 +1261,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat table: String, partialSpec: Option[TablePartitionSpec] = None): Seq[String] = withClient { val catalogTable = getTable(db, table) - val partColNameMap = buildLowerCasePartColNameMap(catalogTable).mapValues(escapePathName) + val partColNameMap = buildLowerCasePartColNameMap(catalogTable).view.mapValues(escapePathName) val clientPartitionNames = client.getPartitionNames(catalogTable, partialSpec.map(toMetaStorePartitionSpec)) clientPartitionNames.map { partitionPath => diff --git a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala index af884914ad8..5a124be5679 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/api/java/JavaStreamingListenerWrapper.scala @@ -65,7 +65,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav private def toJavaBatchInfo(batchInfo: BatchInfo): JavaBatchInfo = { JavaBatchInfo( batchInfo.batchTime, - batchInfo.streamIdToInputInfo.mapValues(toJavaStreamInputInfo).toMap.asJava, + batchInfo.streamIdToInputInfo.view.mapValues(toJavaStreamInputInfo).toMap.asJava, batchInfo.submissionTime, batchInfo.processingStartTime.getOrElse(-1), batchInfo.processingEndTime.getOrElse(-1), @@ -73,7 +73,7 @@ private[streaming] class JavaStreamingListenerWrapper(javaStreamingListener: Jav batchInfo.processingDelay.getOrElse(-1), batchInfo.totalDelay.getOrElse(-1), batchInfo.numRecords, - batchInfo.outputOperationInfos.mapValues(toJavaOutputOperationInfo).toMap.asJava + batchInfo.outputOperationInfos.view.mapValues(toJavaOutputOperationInfo).toMap.asJava ) } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala index 02eee36d4be..5c47cde0459 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverSchedulingPolicy.scala @@ -184,7 +184,7 @@ private[streaming] class ReceiverSchedulingPolicy { val executorWeights: Map[ExecutorCacheTaskLocation, Double] = { receiverTrackingInfoMap.values.flatMap(convertReceiverTrackingInfoToExecutorWeights) - .groupBy(_._1).mapValues(_.map(_._2).sum).toMap // Sum weights for each executor + .groupBy(_._1).view.mapValues(_.map(_._2).sum).toMap // Sum weights for each executor } val idleExecutors = executors.toSet -- executorWeights.keys diff --git a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala index 342a0a43b50..685ddf67237 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala @@ -244,7 +244,7 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false */ def allocatedExecutors(): Map[Int, Option[String]] = synchronized { if (isTrackerStarted) { - endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).mapValues { + endpoint.askSync[Map[Int, ReceiverTrackingInfo]](GetAllReceiverInfo).view.mapValues { _.runningExecutor.map { _.executorId } diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala index 1af60857bc7..08de8d46c31 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/BatchUIData.scala @@ -98,7 +98,7 @@ private[ui] object BatchUIData { def apply(batchInfo: BatchInfo): BatchUIData = { val outputOperations = mutable.HashMap[OutputOpId, OutputOperationUIData]() - outputOperations ++= batchInfo.outputOperationInfos.mapValues(OutputOperationUIData.apply) + outputOperations ++= batchInfo.outputOperationInfos.view.mapValues(OutputOperationUIData.apply) new BatchUIData( batchInfo.batchTime, batchInfo.streamIdToInputInfo, diff --git a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala index 9abf018584c..140eb866f6f 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingJobProgressListener.scala @@ -216,7 +216,8 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) def receivedRecordRateWithBatchTime: Map[Int, Seq[(Long, Double)]] = synchronized { val _retainedBatches = retainedBatches val latestBatches = _retainedBatches.map { batchUIData => - (batchUIData.batchTime.milliseconds, batchUIData.streamIdToInputInfo.mapValues(_.numRecords)) + (batchUIData.batchTime.milliseconds, + batchUIData.streamIdToInputInfo.view.mapValues(_.numRecords)) } streamIds.map { streamId => val recordRates = latestBatches.map { @@ -230,7 +231,7 @@ private[spark] class StreamingJobProgressListener(ssc: StreamingContext) def lastReceivedBatchRecords: Map[Int, Long] = synchronized { val lastReceivedBlockInfoOption = - lastReceivedBatch.map(_.streamIdToInputInfo.mapValues(_.numRecords)) + lastReceivedBatch.map(_.streamIdToInputInfo.view.mapValues(_.numRecords)) lastReceivedBlockInfoOption.map { lastReceivedBlockInfo => streamIds.map { streamId => (streamId, lastReceivedBlockInfo.getOrElse(streamId, 0L)) --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org