http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala index 91d63d4..a2ab320 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/PollingFlumeTestUtils.scala @@ -18,9 +18,8 @@ package org.apache.spark.streaming.flume import java.util.concurrent._ -import java.util.{List => JList, Map => JMap} +import java.util.{Map => JMap, Collections} -import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import com.google.common.base.Charsets.UTF_8 @@ -77,7 +76,7 @@ private[flume] class PollingFlumeTestUtils { /** * Start 2 sinks and return the ports */ - def startMultipleSinks(): JList[Int] = { + def startMultipleSinks(): Seq[Int] = { channels.clear() sinks.clear() @@ -138,8 +137,7 @@ private[flume] class PollingFlumeTestUtils { /** * A Python-friendly method to assert the output */ - def assertOutput( - outputHeaders: JList[JMap[String, String]], outputBodies: JList[String]): Unit = { + def assertOutput(outputHeaders: Seq[JMap[String, String]], outputBodies: Seq[String]): Unit = { require(outputHeaders.size == outputBodies.size) val eventSize = outputHeaders.size if (eventSize != totalEventsPerChannel * channels.size) { @@ -149,12 +147,12 @@ private[flume] class PollingFlumeTestUtils { var counter = 0 for (k <- 0 until channels.size; i <- 0 until totalEventsPerChannel) { val eventBodyToVerify = s"${channels(k).getName}-$i" - val eventHeaderToVerify: JMap[String, String] = Map[String, String](s"test-$i" -> "header") + val eventHeaderToVerify: JMap[String, String] = Collections.singletonMap(s"test-$i", "header") var found = false var j = 0 while (j < eventSize && !found) { - if (eventBodyToVerify == outputBodies.get(j) && - eventHeaderToVerify == outputHeaders.get(j)) { + if (eventBodyToVerify == outputBodies(j) && + eventHeaderToVerify == outputHeaders(j)) { found = true counter += 1 } @@ -195,7 +193,7 @@ private[flume] class PollingFlumeTestUtils { tx.begin() for (j <- 0 until eventsPerBatch) { channel.put(EventBuilder.withBody(s"${channel.getName}-$t".getBytes(UTF_8), - Map[String, String](s"test-$t" -> "header"))) + Collections.singletonMap(s"test-$t", "header"))) t += 1 } tx.commit()
http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala index d5f9a0a..ff2fb8e 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumePollingStreamSuite.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{SynchronizedBuffer, ArrayBuffer} import scala.concurrent.duration._ import scala.language.postfixOps @@ -116,9 +116,9 @@ class FlumePollingStreamSuite extends SparkFunSuite with BeforeAndAfter with Log // The eventually is required to ensure that all data in the batch has been processed. eventually(timeout(10 seconds), interval(100 milliseconds)) { val flattenOutputBuffer = outputBuffer.flatten - val headers = flattenOutputBuffer.map(_.event.getHeaders.map { - case kv => (kv._1.toString, kv._2.toString) - }).map(mapAsJavaMap) + val headers = flattenOutputBuffer.map(_.event.getHeaders.asScala.map { + case (key, value) => (key.toString, value.toString) + }).map(_.asJava) val bodies = flattenOutputBuffer.map(e => new String(e.event.getBody.array(), UTF_8)) utils.assertOutput(headers, bodies) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala index 5bc4cdf..5ffb60b 100644 --- a/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala +++ b/external/flume/src/test/scala/org/apache/spark/streaming/flume/FlumeStreamSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.flume -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, SynchronizedBuffer} import scala.concurrent.duration._ import scala.language.postfixOps http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala index 79a9db4..c9fd715 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaTestUtils.scala @@ -24,6 +24,7 @@ import java.util.concurrent.TimeoutException import java.util.{Map => JMap, Properties} import scala.annotation.tailrec +import scala.collection.JavaConverters._ import scala.language.postfixOps import scala.util.control.NonFatal @@ -159,8 +160,7 @@ private[kafka] class KafkaTestUtils extends Logging { /** Java-friendly function for sending messages to the Kafka broker */ def sendMessages(topic: String, messageToFreq: JMap[String, JInt]): Unit = { - import scala.collection.JavaConversions._ - sendMessages(topic, Map(messageToFreq.mapValues(_.intValue()).toSeq: _*)) + sendMessages(topic, Map(messageToFreq.asScala.mapValues(_.intValue()).toSeq: _*)) } /** Send the messages to the Kafka broker */ http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala ---------------------------------------------------------------------- diff --git a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala index 388dbb8..3128222 100644 --- a/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala +++ b/external/kafka/src/main/scala/org/apache/spark/streaming/kafka/KafkaUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.kafka import java.lang.{Integer => JInt, Long => JLong} import java.util.{List => JList, Map => JMap, Set => JSet} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import kafka.common.TopicAndPartition @@ -96,7 +96,7 @@ object KafkaUtils { groupId: String, topics: JMap[String, JInt] ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*)) + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*)) } /** @@ -115,7 +115,7 @@ object KafkaUtils { topics: JMap[String, JInt], storageLevel: StorageLevel ): JavaPairReceiverInputDStream[String, String] = { - createStream(jssc.ssc, zkQuorum, groupId, Map(topics.mapValues(_.intValue()).toSeq: _*), + createStream(jssc.ssc, zkQuorum, groupId, Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), storageLevel) } @@ -149,7 +149,10 @@ object KafkaUtils { implicit val valueCmd: ClassTag[T] = ClassTag(valueDecoderClass) createStream[K, V, U, T]( - jssc.ssc, kafkaParams.toMap, Map(topics.mapValues(_.intValue()).toSeq: _*), storageLevel) + jssc.ssc, + kafkaParams.asScala.toMap, + Map(topics.asScala.mapValues(_.intValue()).toSeq: _*), + storageLevel) } /** get leaders for the given offset ranges, or throw an exception */ @@ -275,7 +278,7 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) new JavaPairRDD(createRDD[K, V, KD, VD]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges)) + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges)) } /** @@ -311,9 +314,9 @@ object KafkaUtils { implicit val keyDecoderCmt: ClassTag[KD] = ClassTag(keyDecoderClass) implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) implicit val recordCmt: ClassTag[R] = ClassTag(recordClass) - val leaderMap = Map(leaders.toSeq: _*) + val leaderMap = Map(leaders.asScala.toSeq: _*) createRDD[K, V, KD, VD, R]( - jsc.sc, Map(kafkaParams.toSeq: _*), offsetRanges, leaderMap, messageHandler.call _) + jsc.sc, Map(kafkaParams.asScala.toSeq: _*), offsetRanges, leaderMap, messageHandler.call(_)) } /** @@ -476,8 +479,8 @@ object KafkaUtils { val cleanedHandler = jssc.sparkContext.clean(messageHandler.call _) createDirectStream[K, V, KD, VD, R]( jssc.ssc, - Map(kafkaParams.toSeq: _*), - Map(fromOffsets.mapValues { _.longValue() }.toSeq: _*), + Map(kafkaParams.asScala.toSeq: _*), + Map(fromOffsets.asScala.mapValues(_.longValue()).toSeq: _*), cleanedHandler ) } @@ -531,8 +534,8 @@ object KafkaUtils { implicit val valueDecoderCmt: ClassTag[VD] = ClassTag(valueDecoderClass) createDirectStream[K, V, KD, VD]( jssc.ssc, - Map(kafkaParams.toSeq: _*), - Set(topics.toSeq: _*) + Map(kafkaParams.asScala.toSeq: _*), + Set(topics.asScala.toSeq: _*) ) } } @@ -602,10 +605,10 @@ private[kafka] class KafkaUtilsPythonHelper { ): JavaPairInputDStream[Array[Byte], Array[Byte]] = { if (!fromOffsets.isEmpty) { - import scala.collection.JavaConversions._ - val topicsFromOffsets = fromOffsets.keySet().map(_.topic) - if (topicsFromOffsets != topics.toSet) { - throw new IllegalStateException(s"The specified topics: ${topics.toSet.mkString(" ")} " + + val topicsFromOffsets = fromOffsets.keySet().asScala.map(_.topic) + if (topicsFromOffsets != topics.asScala.toSet) { + throw new IllegalStateException( + s"The specified topics: ${topics.asScala.toSet.mkString(" ")} " + s"do not equal to the topic from offsets: ${topicsFromOffsets.mkString(" ")}") } } @@ -663,6 +666,6 @@ private[kafka] class KafkaUtilsPythonHelper { "with this RDD, please call this method only on a Kafka RDD.") val kafkaRDD = kafkaRDDs.head.asInstanceOf[KafkaRDD[_, _, _, _, _]] - kafkaRDD.offsetRanges.toSeq + kafkaRDD.offsetRanges.toSeq.asJava } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala ---------------------------------------------------------------------- diff --git a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala index 0469d0a..4ea218e 100644 --- a/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala +++ b/external/zeromq/src/main/scala/org/apache/spark/streaming/zeromq/ZeroMQUtils.scala @@ -18,15 +18,17 @@ package org.apache.spark.streaming.zeromq import scala.reflect.ClassTag -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ + import akka.actor.{Props, SupervisorStrategy} import akka.util.ByteString import akka.zeromq.Subscribe + import org.apache.spark.api.java.function.{Function => JFunction} import org.apache.spark.storage.StorageLevel import org.apache.spark.streaming.StreamingContext import org.apache.spark.streaming.api.java.{JavaReceiverInputDStream, JavaStreamingContext} -import org.apache.spark.streaming.dstream.{ReceiverInputDStream} +import org.apache.spark.streaming.dstream.ReceiverInputDStream import org.apache.spark.streaming.receiver.ActorSupervisorStrategy object ZeroMQUtils { @@ -75,7 +77,8 @@ object ZeroMQUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel, supervisorStrategy) } @@ -99,7 +102,8 @@ object ZeroMQUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T](jssc.ssc, publisherUrl, subscribe, fn, storageLevel) } @@ -122,7 +126,8 @@ object ZeroMQUtils { ): JavaReceiverInputDStream[T] = { implicit val cm: ClassTag[T] = implicitly[ClassTag[AnyRef]].asInstanceOf[ClassTag[T]] - val fn = (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).toIterator + val fn = + (x: Seq[ByteString]) => bytesToObjects.call(x.map(_.toArray).toArray).iterator().asScala createStream[T](jssc.ssc, publisherUrl, subscribe, fn) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala index a003ddf..5d32fa6 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisBackedBlockRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.streaming.kinesis -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.control.NonFatal import com.amazonaws.auth.{AWSCredentials, DefaultAWSCredentialsProviderChain} @@ -213,7 +213,7 @@ class KinesisSequenceRangeIterator( s"getting records using shard iterator") { client.getRecords(getRecordsRequest) } - (getRecordsResult.getRecords.iterator(), getRecordsResult.getNextShardIterator) + (getRecordsResult.getRecords.iterator().asScala, getRecordsResult.getNextShardIterator) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala index 22324e8..6e0988c 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisReceiver.scala @@ -18,7 +18,7 @@ package org.apache.spark.streaming.kinesis import java.util.UUID -import scala.collection.JavaConversions.asScalaIterator +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.control.NonFatal @@ -202,7 +202,7 @@ private[kinesis] class KinesisReceiver( /** Add records of the given shard to the current block being generated */ private[kinesis] def addRecords(shardId: String, records: java.util.List[Record]): Unit = { if (records.size > 0) { - val dataIterator = records.iterator().map { record => + val dataIterator = records.iterator().asScala.map { record => val byteBuffer = record.getData() val byteArray = new Array[Byte](byteBuffer.remaining()) byteBuffer.get(byteArray) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala index c8eec13..634bf94 100644 --- a/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala +++ b/extras/kinesis-asl/src/main/scala/org/apache/spark/streaming/kinesis/KinesisTestUtils.scala @@ -20,6 +20,7 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer import java.util.concurrent.TimeUnit +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.{Failure, Random, Success, Try} @@ -115,7 +116,7 @@ private[kinesis] class KinesisTestUtils extends Logging { * Expose a Python friendly API. */ def pushData(testData: java.util.List[Int]): Unit = { - pushData(scala.collection.JavaConversions.asScalaBuffer(testData)) + pushData(testData.asScala) } def deleteStream(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala ---------------------------------------------------------------------- diff --git a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala index ceb135e..3d136ae 100644 --- a/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala +++ b/extras/kinesis-asl/src/test/scala/org/apache/spark/streaming/kinesis/KinesisReceiverSuite.scala @@ -17,10 +17,10 @@ package org.apache.spark.streaming.kinesis import java.nio.ByteBuffer +import java.nio.charset.StandardCharsets +import java.util.Arrays -import scala.collection.JavaConversions.seqAsJavaList - -import com.amazonaws.services.kinesis.clientlibrary.exceptions.{InvalidStateException, KinesisClientLibDependencyException, ShutdownException, ThrottlingException} +import com.amazonaws.services.kinesis.clientlibrary.exceptions._ import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownReason import com.amazonaws.services.kinesis.model.Record @@ -47,10 +47,10 @@ class KinesisReceiverSuite extends TestSuiteBase with Matchers with BeforeAndAft val someSeqNum = Some(seqNum) val record1 = new Record() - record1.setData(ByteBuffer.wrap("Spark In Action".getBytes())) + record1.setData(ByteBuffer.wrap("Spark In Action".getBytes(StandardCharsets.UTF_8))) val record2 = new Record() - record2.setData(ByteBuffer.wrap("Learning Spark".getBytes())) - val batch = List[Record](record1, record2) + record2.setData(ByteBuffer.wrap("Learning Spark".getBytes(StandardCharsets.UTF_8))) + val batch = Arrays.asList(record1, record2) var receiverMock: KinesisReceiver = _ var checkpointerMock: IRecordProcessorCheckpointer = _ http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala ---------------------------------------------------------------------- diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala index 87eeb5d..7a1c779 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/util/LinearDataGenerator.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.util -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Random import com.github.fommil.netlib.BLAS.{getInstance => blas} @@ -52,7 +52,7 @@ object LinearDataGenerator { nPoints: Int, seed: Int, eps: Double): java.util.List[LabeledPoint] = { - seqAsJavaList(generateLinearInput(intercept, weights, nPoints, seed, eps)) + generateLinearInput(intercept, weights, nPoints, seed, eps).asJava } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/test/java/org/apache/spark/ml/classification/JavaOneVsRestSuite.java ---------------------------------------------------------------------- diff --git a/mllib/src/test/java/org/apache/spark/ml/classification/JavaOneVsRestSuite.java b/mllib/src/test/java/org/apache/spark/ml/classification/JavaOneVsRestSuite.java index a1ee554..2744e02 100644 --- a/mllib/src/test/java/org/apache/spark/ml/classification/JavaOneVsRestSuite.java +++ b/mllib/src/test/java/org/apache/spark/ml/classification/JavaOneVsRestSuite.java @@ -20,7 +20,7 @@ package org.apache.spark.ml.classification; import java.io.Serializable; import java.util.List; -import static scala.collection.JavaConversions.seqAsJavaList; +import scala.collection.JavaConverters; import org.junit.After; import org.junit.Assert; @@ -55,8 +55,9 @@ public class JavaOneVsRestSuite implements Serializable { double[] xMean = {5.843, 3.057, 3.758, 1.199}; double[] xVariance = {0.6856, 0.1899, 3.116, 0.581}; - List<LabeledPoint> points = seqAsJavaList(generateMultinomialLogisticInput( - weights, xMean, xVariance, true, nPoints, 42)); + List<LabeledPoint> points = JavaConverters.asJavaListConverter( + generateMultinomialLogisticInput(weights, xMean, xVariance, true, nPoints, 42) + ).asJava(); datasetRDD = jsc.parallelize(points, 2); dataset = jsql.createDataFrame(datasetRDD, LabeledPoint.class); } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala index 2473510..8d14bb6 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/LogisticRegressionSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.classification -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Random import scala.util.control.Breaks._ @@ -38,7 +38,7 @@ object LogisticRegressionSuite { scale: Double, nPoints: Int, seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateLogisticInput(offset, scale, nPoints, seed)) + generateLogisticInput(offset, scale, nPoints, seed).asJava } // Generate input of the form Y = logistic(offset + scale*X) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala index b1d78cb..ee3c85d 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/classification/SVMSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.classification -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Random import org.jblas.DoubleMatrix @@ -35,7 +35,7 @@ object SVMSuite { weights: Array[Double], nPoints: Int, seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateSVMInput(intercept, weights, nPoints, seed)) + generateSVMInput(intercept, weights, nPoints, seed).asJava } // Generate noisy input of the form Y = signum(x.dot(weights) + intercept + noise) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala index 13b754a..36ac7d2 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/optimization/GradientDescentSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.optimization -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Random import org.scalatest.Matchers @@ -35,7 +35,7 @@ object GradientDescentSuite { scale: Double, nPoints: Int, seed: Int): java.util.List[LabeledPoint] = { - seqAsJavaList(generateGDInput(offset, scale, nPoints, seed)) + generateGDInput(offset, scale, nPoints, seed).asJava } // Generate input of the form Y = logistic(offset + scale * X) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala ---------------------------------------------------------------------- diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala index 05b8772..045135f 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/ALSSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.mllib.recommendation -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.math.abs import scala.util.Random @@ -38,7 +38,7 @@ object ALSSuite { negativeWeights: Boolean): (java.util.List[Rating], DoubleMatrix, DoubleMatrix) = { val (sampledRatings, trueRatings, truePrefs) = generateRatings(users, products, features, samplingRate, implicitPrefs) - (seqAsJavaList(sampledRatings), trueRatings, truePrefs) + (sampledRatings.asJava, trueRatings, truePrefs) } def generateRatings( http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/project/SparkBuild.scala ---------------------------------------------------------------------- diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 04e0d49..ea52bfd 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -18,13 +18,13 @@ import java.io._ import scala.util.Properties -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import sbt._ import sbt.Classpaths.publishTask import sbt.Keys._ import sbtunidoc.Plugin.UnidocKeys.unidocGenjavadocVersion -import com.typesafe.sbt.pom.{loadEffectivePom, PomBuild, SbtPomKeys} +import com.typesafe.sbt.pom.{PomBuild, SbtPomKeys} import net.virtualvoid.sbt.graph.Plugin.graphSettings import spray.revolver.RevolverPlugin._ @@ -120,7 +120,7 @@ object SparkBuild extends PomBuild { case _ => } - override val userPropertiesMap = System.getProperties.toMap + override val userPropertiesMap = System.getProperties.asScala.toMap lazy val MavenCompile = config("m2r") extend(Compile) lazy val publishLocalBoth = TaskKey[Unit]("publish-local", "publish local for m2 and ivy") @@ -559,7 +559,7 @@ object TestSettings { javaOptions in Test += "-Dspark.unsafe.exceptionOnMemoryLeak=true", javaOptions in Test += "-Dsun.io.serialization.extendedDebugInfo=true", javaOptions in Test += "-Dderby.system.durability=test", - javaOptions in Test ++= System.getProperties.filter(_._1 startsWith "spark") + javaOptions in Test ++= System.getProperties.asScala.filter(_._1.startsWith("spark")) .map { case (k,v) => s"-D$k=$v" }.toSeq, javaOptions in Test += "-ea", javaOptions in Test ++= "-Xmx3g -Xss4096k -XX:PermSize=128M -XX:MaxNewSize=256m -XX:MaxPermSize=1g" http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/python/pyspark/sql/column.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/column.py b/python/pyspark/sql/column.py index 8af8637..0948f9b 100644 --- a/python/pyspark/sql/column.py +++ b/python/pyspark/sql/column.py @@ -61,6 +61,18 @@ def _to_seq(sc, cols, converter=None): return sc._jvm.PythonUtils.toSeq(cols) +def _to_list(sc, cols, converter=None): + """ + Convert a list of Column (or names) into a JVM (Scala) List of Column. + + An optional `converter` could be used to convert items in `cols` + into JVM Column objects. + """ + if converter: + cols = [converter(c) for c in cols] + return sc._jvm.PythonUtils.toList(cols) + + def _unary_op(name, doc="unary operator"): """ Create a method for given unary operator """ def _(self): http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/python/pyspark/sql/dataframe.py ---------------------------------------------------------------------- diff --git a/python/pyspark/sql/dataframe.py b/python/pyspark/sql/dataframe.py index 025811f..e269ef4 100644 --- a/python/pyspark/sql/dataframe.py +++ b/python/pyspark/sql/dataframe.py @@ -32,7 +32,7 @@ from pyspark.storagelevel import StorageLevel from pyspark.traceback_utils import SCCallSiteSync from pyspark.sql import since from pyspark.sql.types import _parse_datatype_json_string -from pyspark.sql.column import Column, _to_seq, _to_java_column +from pyspark.sql.column import Column, _to_seq, _to_list, _to_java_column from pyspark.sql.readwriter import DataFrameWriter from pyspark.sql.types import * @@ -494,7 +494,7 @@ class DataFrame(object): if w < 0.0: raise ValueError("Weights must be positive. Found weight value: %s" % w) seed = seed if seed is not None else random.randint(0, sys.maxsize) - rdd_array = self._jdf.randomSplit(_to_seq(self.sql_ctx._sc, weights), long(seed)) + rdd_array = self._jdf.randomSplit(_to_list(self.sql_ctx._sc, weights), long(seed)) return [DataFrame(rdd, self.sql_ctx) for rdd in rdd_array] @property http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/scalastyle-config.xml ---------------------------------------------------------------------- diff --git a/scalastyle-config.xml b/scalastyle-config.xml index b5e2e88..68fdb41 100644 --- a/scalastyle-config.xml +++ b/scalastyle-config.xml @@ -161,6 +161,13 @@ This file is divided into 3 sections: ]]></customMessage> </check> + <!-- As of SPARK-9613 JavaConversions should be replaced with JavaConverters --> + <check customId="javaconversions" level="error" class="org.scalastyle.scalariform.TokenChecker" enabled="true"> + <parameters><parameter name="regex">JavaConversions</parameter></parameters> + <customMessage>Instead of importing implicits in scala.collection.JavaConversions._, import + scala.collection.JavaConverters._ and use .asScala / .asJava methods</customMessage> + </check> + <!-- ================================================================================ --> <!-- rules we'd like to enforce, but haven't cleaned up the codebase yet --> <!-- ================================================================================ --> http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala index ec895af..cfd9cb0 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/Row.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.JavaConverters._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.GenericRow import org.apache.spark.sql.types.StructType @@ -280,9 +282,8 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getList[T](i: Int): java.util.List[T] = { - scala.collection.JavaConversions.seqAsJavaList(getSeq[T](i)) - } + def getList[T](i: Int): java.util.List[T] = + getSeq[T](i).asJava /** * Returns the value at position i of map type as a Scala Map. @@ -296,9 +297,8 @@ trait Row extends Serializable { * * @throws ClassCastException when data type does not match. */ - def getJavaMap[K, V](i: Int): java.util.Map[K, V] = { - scala.collection.JavaConversions.mapAsJavaMap(getMap[K, V](i)) - } + def getJavaMap[K, V](i: Int): java.util.Map[K, V] = + getMap[K, V](i).asJava /** * Returns the value at position i of struct type as an [[Row]] object. http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala ---------------------------------------------------------------------- diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala index 503c4f4..4cc9a55 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Catalog.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.catalyst.analysis import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -147,7 +147,7 @@ class SimpleCatalog(val conf: CatalystConf) extends Catalog { override def getTables(databaseName: Option[String]): Seq[(String, Boolean)] = { val result = ArrayBuffer.empty[(String, Boolean)] - for (name <- tables.keySet()) { + for (name <- tables.keySet().asScala) { result += ((name, true)) } result http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala index a4fd4cf..77a42c0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameNaFunctions.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.{lang => jl} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.expressions._ @@ -209,7 +209,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * * @since 1.3.1 */ - def fill(valueMap: java.util.Map[String, Any]): DataFrame = fill0(valueMap.toSeq) + def fill(valueMap: java.util.Map[String, Any]): DataFrame = fill0(valueMap.asScala.toSeq) /** * (Scala-specific) Returns a new [[DataFrame]] that replaces null values. @@ -254,7 +254,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def replace[T](col: String, replacement: java.util.Map[T, T]): DataFrame = { - replace[T](col, replacement.toMap : Map[T, T]) + replace[T](col, replacement.asScala.toMap) } /** @@ -277,7 +277,7 @@ final class DataFrameNaFunctions private[sql](df: DataFrame) { * @since 1.3.1 */ def replace[T](cols: Array[String], replacement: java.util.Map[T, T]): DataFrame = { - replace(cols.toSeq, replacement.toMap) + replace(cols.toSeq, replacement.asScala.toMap) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index 6dc7bfe..97a8b65 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.Properties +import scala.collection.JavaConverters._ + import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Experimental @@ -90,7 +92,7 @@ class DataFrameReader private[sql](sqlContext: SQLContext) extends Logging { * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameReader = { - this.options(scala.collection.JavaConversions.mapAsScalaMap(options)) + this.options(options.asScala) this } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index ce8744b..b2a66dd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -19,6 +19,8 @@ package org.apache.spark.sql import java.util.Properties +import scala.collection.JavaConverters._ + import org.apache.spark.annotation.Experimental import org.apache.spark.sql.catalyst.{SqlParser, TableIdentifier} import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation @@ -109,7 +111,7 @@ final class DataFrameWriter private[sql](df: DataFrame) { * @since 1.4.0 */ def options(options: java.util.Map[String, String]): DataFrameWriter = { - this.options(scala.collection.JavaConversions.mapAsScalaMap(options)) + this.options(options.asScala) this } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala index 99d557b..ee31d83 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/GroupedData.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import org.apache.spark.annotation.Experimental @@ -188,7 +188,7 @@ class GroupedData protected[sql]( * @since 1.3.0 */ def agg(exprs: java.util.Map[String, String]): DataFrame = { - agg(exprs.toMap) + agg(exprs.asScala.toMap) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala index e9de14f..e6f7619 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLConf.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql import java.util.Properties import scala.collection.immutable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.parquet.hadoop.ParquetOutputCommitter @@ -531,7 +531,7 @@ private[sql] class SQLConf extends Serializable with CatalystConf { /** Set Spark SQL configuration properties. */ def setConf(props: Properties): Unit = settings.synchronized { - props.foreach { case (k, v) => setConfString(k, v) } + props.asScala.foreach { case (k, v) => setConfString(k, v) } } /** Set the given Spark SQL configuration property using a `string` value. */ @@ -601,24 +601,25 @@ private[sql] class SQLConf extends Serializable with CatalystConf { * Return all the configuration properties that have been set (i.e. not the default). * This creates a new copy of the config properties in the form of a Map. */ - def getAllConfs: immutable.Map[String, String] = settings.synchronized { settings.toMap } + def getAllConfs: immutable.Map[String, String] = + settings.synchronized { settings.asScala.toMap } /** * Return all the configuration definitions that have been defined in [[SQLConf]]. Each * definition contains key, defaultValue and doc. */ def getAllDefinedConfs: Seq[(String, String, String)] = sqlConfEntries.synchronized { - sqlConfEntries.values.filter(_.isPublic).map { entry => + sqlConfEntries.values.asScala.filter(_.isPublic).map { entry => (entry.key, entry.defaultValueString, entry.doc) }.toSeq } private[spark] def unsetConf(key: String): Unit = { - settings -= key + settings.remove(key) } private[spark] def unsetConf(entry: SQLConfEntry[_]): Unit = { - settings -= entry.key + settings.remove(entry.key) } private[spark] def clear(): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala index a1eea09..4e8414a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/SQLContext.scala @@ -21,7 +21,7 @@ import java.beans.Introspector import java.util.Properties import java.util.concurrent.atomic.AtomicReference -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.immutable import scala.reflect.runtime.universe.TypeTag import scala.util.control.NonFatal @@ -225,7 +225,7 @@ class SQLContext(@transient val sparkContext: SparkContext) conf.setConf(properties) // After we have populated SQLConf, we call setConf to populate other confs in the subclass // (e.g. hiveconf in HiveContext). - properties.foreach { + properties.asScala.foreach { case (key, value) => setConf(key, value) } } @@ -567,7 +567,7 @@ class SQLContext(@transient val sparkContext: SparkContext) tableName: String, source: String, options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, options.toMap) + createExternalTable(tableName, source, options.asScala.toMap) } /** @@ -612,7 +612,7 @@ class SQLContext(@transient val sparkContext: SparkContext) source: String, schema: StructType, options: java.util.Map[String, String]): DataFrame = { - createExternalTable(tableName, source, schema, options.toMap) + createExternalTable(tableName, source, schema, options.asScala.toMap) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala index 8fbaf3a..0117244 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/ResolvedDataSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources import java.util.ServiceLoader -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.{existentials, implicitConversions} import scala.util.{Success, Failure, Try} @@ -55,7 +55,7 @@ object ResolvedDataSource extends Logging { val loader = Utils.getContextOrSparkClassLoader val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader) - serviceLoader.iterator().filter(_.shortName().equalsIgnoreCase(provider)).toList match { + serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider)).toList match { /** the provider format did not match any given registered aliases */ case Nil => Try(loader.loadClass(provider)).orElse(Try(loader.loadClass(provider2))) match { case Success(dataSource) => dataSource http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala index 3f8353a..0a6bb44 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystReadSupport.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.util.{Map => JMap} -import scala.collection.JavaConversions.{iterableAsScalaIterable, mapAsJavaMap, mapAsScalaMap} +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.hadoop.api.ReadSupport.ReadContext @@ -44,7 +44,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with val parquetRequestedSchema = readContext.getRequestedSchema val catalystRequestedSchema = - Option(readContext.getReadSupportMetadata).map(_.toMap).flatMap { metadata => + Option(readContext.getReadSupportMetadata).map(_.asScala).flatMap { metadata => metadata // First tries to read requested schema, which may result from projections .get(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA) @@ -123,7 +123,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.fold(context.getFileSchema) { schemaString => val toParquet = new CatalystSchemaConverter(conf) val fileSchema = context.getFileSchema.asGroupType() - val fileFieldNames = fileSchema.getFields.map(_.getName).toSet + val fileFieldNames = fileSchema.getFields.asScala.map(_.getName).toSet StructType // Deserializes the Catalyst schema of requested columns @@ -152,7 +152,7 @@ private[parquet] class CatalystReadSupport extends ReadSupport[InternalRow] with maybeRequestedSchema.map(CatalystReadSupport.SPARK_ROW_REQUESTED_SCHEMA -> _) ++ maybeRowSchema.map(RowWriteSupport.SPARK_ROW_SCHEMA -> _) - new ReadContext(parquetRequestedSchema, metadata) + new ReadContext(parquetRequestedSchema, metadata.asJava) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala index cbf0704..f682ca0 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystRowConverter.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.math.{BigDecimal, BigInteger} import java.nio.ByteOrder -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.parquet.column.Dictionary @@ -183,7 +183,7 @@ private[parquet] class CatalystRowConverter( // those missing fields and create converters for them, although values of these fields are // always null. val paddedParquetFields = { - val parquetFields = parquetType.getFields + val parquetFields = parquetType.getFields.asScala val parquetFieldNames = parquetFields.map(_.getName).toSet val missingFields = catalystType.filterNot(f => parquetFieldNames.contains(f.name)) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala index 535f068..be6c054 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/CatalystSchemaConverter.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.conf.Configuration import org.apache.parquet.schema.OriginalType._ @@ -82,7 +82,7 @@ private[parquet] class CatalystSchemaConverter( def convert(parquetSchema: MessageType): StructType = convert(parquetSchema.asGroupType()) private def convert(parquetSchema: GroupType): StructType = { - val fields = parquetSchema.getFields.map { field => + val fields = parquetSchema.getFields.asScala.map { field => field.getRepetition match { case OPTIONAL => StructField(field.getName, convertField(field), nullable = true) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala index bbf682a..64982f3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRelation.scala @@ -21,7 +21,7 @@ import java.net.URI import java.util.logging.{Logger => JLogger} import java.util.{List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.{Failure, Try} @@ -336,7 +336,7 @@ private[sql] class ParquetRelation( override def getPartitions: Array[SparkPartition] = { val inputFormat = new ParquetInputFormat[InternalRow] { override def listStatus(jobContext: JobContext): JList[FileStatus] = { - if (cacheMetadata) cachedStatuses else super.listStatus(jobContext) + if (cacheMetadata) cachedStatuses.asJava else super.listStatus(jobContext) } } @@ -344,7 +344,8 @@ private[sql] class ParquetRelation( val rawSplits = inputFormat.getSplits(jobContext) Array.tabulate[SparkPartition](rawSplits.size) { i => - new SqlNewHadoopPartition(id, i, rawSplits(i).asInstanceOf[InputSplit with Writable]) + new SqlNewHadoopPartition( + id, i, rawSplits.get(i).asInstanceOf[InputSplit with Writable]) } } }.asInstanceOf[RDD[Row]] // type erasure hack to pass RDD[InternalRow] as RDD[Row] @@ -588,7 +589,7 @@ private[sql] object ParquetRelation extends Logging { val metadata = footer.getParquetMetadata.getFileMetaData val serializedSchema = metadata .getKeyValueMetaData - .toMap + .asScala.toMap .get(CatalystReadSupport.SPARK_METADATA_KEY) if (serializedSchema.isEmpty) { // Falls back to Parquet schema if no Spark SQL schema found. @@ -745,7 +746,7 @@ private[sql] object ParquetRelation extends Logging { // Reads footers in multi-threaded manner within each task val footers = ParquetFileReader.readAllFootersInParallel( - serializedConf.value, fakeFileStatuses, skipRowGroups) + serializedConf.value, fakeFileStatuses.asJava, skipRowGroups).asScala // Converter used to convert Parquet `MessageType` to Spark SQL `StructType` val converter = @@ -772,7 +773,7 @@ private[sql] object ParquetRelation extends Logging { val fileMetaData = footer.getParquetMetadata.getFileMetaData fileMetaData .getKeyValueMetaData - .toMap + .asScala.toMap .get(CatalystReadSupport.SPARK_METADATA_KEY) .flatMap(deserializeSchemaString) .getOrElse(converter.convert(fileMetaData.getSchema)) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala index 42376ef..142301f 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetTypesConverter.scala @@ -18,8 +18,8 @@ package org.apache.spark.sql.execution.datasources.parquet import java.io.IOException +import java.util.{Collections, Arrays} -import scala.collection.JavaConversions._ import scala.util.Try import org.apache.hadoop.conf.Configuration @@ -107,7 +107,7 @@ private[parquet] object ParquetTypesConverter extends Logging { ParquetFileWriter.writeMetadataFile( conf, path, - new Footer(path, new ParquetMetadata(metaData, Nil)) :: Nil) + Arrays.asList(new Footer(path, new ParquetMetadata(metaData, Collections.emptyList())))) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala index ed282f9..d800c74 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashOuterJoin.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.joins -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.annotation.DeveloperApi import org.apache.spark.rdd.RDD @@ -92,9 +92,9 @@ case class ShuffledHashOuterJoin( case FullOuter => // TODO(davies): use UnsafeRow val leftHashTable = - buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output)) + buildHashTable(leftIter, numLeftRows, newProjection(leftKeys, left.output)).asScala val rightHashTable = - buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output)) + buildHashTable(rightIter, numRightRows, newProjection(rightKeys, right.output)).asScala (leftHashTable.keySet ++ rightHashTable.keySet).iterator.flatMap { key => fullOuterIterator(key, leftHashTable.getOrElse(key, EMPTY_LIST), http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala index 59f8b07..5a58d84 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/pythonUDFs.scala @@ -20,7 +20,7 @@ package org.apache.spark.sql.execution import java.io.OutputStream import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import net.razorvine.pickle._ @@ -196,14 +196,15 @@ object EvaluatePython { case (c, BinaryType) if c.getClass.isArray && c.getClass.getComponentType.getName == "byte" => c case (c: java.util.List[_], ArrayType(elementType, _)) => - new GenericArrayData(c.map { e => fromJava(e, elementType)}.toArray) + new GenericArrayData(c.asScala.map { e => fromJava(e, elementType)}.toArray) case (c, ArrayType(elementType, _)) if c.getClass.isArray => new GenericArrayData(c.asInstanceOf[Array[_]].map(e => fromJava(e, elementType))) case (c: java.util.Map[_, _], MapType(keyType, valueType, _)) => - val keys = c.keysIterator.map(fromJava(_, keyType)).toArray - val values = c.valuesIterator.map(fromJava(_, valueType)).toArray + val keyValues = c.asScala.toSeq + val keys = keyValues.map(kv => fromJava(kv._1, keyType)).toArray + val values = keyValues.map(kv => fromJava(kv._2, valueType)).toArray ArrayBasedMapData(keys, values) case (c, StructType(fields)) if c.getClass.isArray => @@ -367,7 +368,7 @@ case class BatchPythonEvaluation(udf: PythonUDF, output: Seq[Attribute], child: val pickle = new Unpickler iter.flatMap { pickedResult => val unpickledBatch = pickle.loads(pickedResult) - unpickledBatch.asInstanceOf[java.util.ArrayList[Any]] + unpickledBatch.asInstanceOf[java.util.ArrayList[Any]].asScala } }.mapPartitions { iter => val row = new GenericMutableRow(1) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java ---------------------------------------------------------------------- diff --git a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java index 7abdd3d..4867ceb 100644 --- a/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java +++ b/sql/core/src/test/java/test/org/apache/spark/sql/JavaDataFrameSuite.java @@ -23,7 +23,7 @@ import java.util.Comparator; import java.util.List; import java.util.Map; -import scala.collection.JavaConversions; +import scala.collection.JavaConverters; import scala.collection.Seq; import com.google.common.collect.ImmutableMap; @@ -96,7 +96,7 @@ public class JavaDataFrameSuite { df.groupBy().agg(countDistinct("key", "value")); df.groupBy().agg(countDistinct(col("key"), col("value"))); df.select(coalesce(col("key"))); - + // Varargs with mathfunctions DataFrame df2 = context.table("testData2"); df2.select(exp("a"), exp("b")); @@ -172,7 +172,7 @@ public class JavaDataFrameSuite { Seq<Integer> outputBuffer = (Seq<Integer>) first.getJavaMap(2).get("hello"); Assert.assertArrayEquals( bean.getC().get("hello"), - Ints.toArray(JavaConversions.seqAsJavaList(outputBuffer))); + Ints.toArray(JavaConverters.seqAsJavaListConverter(outputBuffer).asJava())); Seq<String> d = first.getAs(3); Assert.assertEquals(bean.getD().size(), d.length()); for (int i = 0; i < d.length(); i++) { @@ -206,7 +206,7 @@ public class JavaDataFrameSuite { count++; } } - + @Test public void testFrequentItems() { DataFrame df = context.table("testData2"); http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala index cdaa14a..329ffb6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DataFrameNaFunctionsSuite.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.test.SharedSQLContext @@ -153,11 +153,11 @@ class DataFrameNaFunctionsSuite extends QueryTest with SharedSQLContext { // Test Java version checkAnswer( - df.na.fill(mapAsJavaMap(Map( + df.na.fill(Map( "a" -> "test", "c" -> 1, "d" -> 2.2 - ))), + ).asJava), Row("test", null, 1, 2.2)) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala index 4adcefb..3649c2a 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql import java.util.{Locale, TimeZone} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.plans._ import org.apache.spark.sql.catalyst.util._ @@ -145,7 +145,7 @@ object QueryTest { } def checkAnswer(df: DataFrame, expectedAnswer: java.util.List[Row]): String = { - checkAnswer(df, expectedAnswer.toSeq) match { + checkAnswer(df, expectedAnswer.asScala) match { case Some(errorMessage) => errorMessage case None => null } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala index 45db619..bd7cf8c 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetAvroCompatibilitySuite.scala @@ -20,8 +20,7 @@ package org.apache.spark.sql.execution.datasources.parquet import java.nio.ByteBuffer import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConverters.seqAsJavaListConverter -import scala.collection.JavaConverters.mapAsJavaMapConverter +import scala.collection.JavaConverters._ import org.apache.avro.Schema import org.apache.avro.generic.IndexedRecord http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala index d85c564..df68432 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetCompatibilityTest.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.parquet.hadoop.ParquetFileReader @@ -40,8 +40,9 @@ private[sql] abstract class ParquetCompatibilityTest extends QueryTest with Parq override def accept(path: Path): Boolean = pathFilter(path) }).toSeq - val footers = ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles, true) - footers.head.getParquetMetadata.getFileMetaData.getSchema + val footers = + ParquetFileReader.readAllFootersInParallel(configuration, parquetFiles.asJava, true) + footers.iterator().next().getParquetMetadata.getFileMetaData.getSchema } protected def logParquetSchema(path: String): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala index e6b0a2e..08d2b9d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetIOSuite.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.execution.datasources.parquet -import scala.collection.JavaConversions._ +import java.util.Collections + +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.reflect.runtime.universe.TypeTag @@ -28,7 +30,7 @@ import org.apache.parquet.example.data.simple.SimpleGroup import org.apache.parquet.example.data.{Group, GroupWriter} import org.apache.parquet.hadoop.api.WriteSupport import org.apache.parquet.hadoop.api.WriteSupport.WriteContext -import org.apache.parquet.hadoop.metadata.{CompressionCodecName, FileMetaData, ParquetMetadata} +import org.apache.parquet.hadoop.metadata.{BlockMetaData, CompressionCodecName, FileMetaData, ParquetMetadata} import org.apache.parquet.hadoop.{Footer, ParquetFileWriter, ParquetOutputCommitter, ParquetWriter} import org.apache.parquet.io.api.RecordConsumer import org.apache.parquet.schema.{MessageType, MessageTypeParser} @@ -205,9 +207,8 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { test("compression codec") { def compressionCodecFor(path: String): String = { val codecs = ParquetTypesConverter - .readMetaData(new Path(path), Some(configuration)) - .getBlocks - .flatMap(_.getColumns) + .readMetaData(new Path(path), Some(configuration)).getBlocks.asScala + .flatMap(_.getColumns.asScala) .map(_.getCodec.name()) .distinct @@ -348,14 +349,16 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { """.stripMargin) withTempPath { location => - val extraMetadata = Map(CatalystReadSupport.SPARK_METADATA_KEY -> sparkSchema.toString) + val extraMetadata = Collections.singletonMap( + CatalystReadSupport.SPARK_METADATA_KEY, sparkSchema.toString) val fileMetadata = new FileMetaData(parquetSchema, extraMetadata, "Spark") val path = new Path(location.getCanonicalPath) ParquetFileWriter.writeMetadataFile( sqlContext.sparkContext.hadoopConfiguration, path, - new Footer(path, new ParquetMetadata(fileMetadata, Nil)) :: Nil) + Collections.singletonList( + new Footer(path, new ParquetMetadata(fileMetadata, Collections.emptyList())))) assertResult(sqlContext.read.parquet(path.toString).schema) { StructType( @@ -386,7 +389,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } @@ -410,7 +413,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } @@ -434,7 +437,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } } @@ -481,7 +484,7 @@ class ParquetIOSuite extends QueryTest with ParquetTest with SharedSQLContext { } finally { // Hadoop 1 doesn't have `Configuration.unset` configuration.clear() - clonedConf.foreach(entry => configuration.set(entry.getKey, entry.getValue)) + clonedConf.asScala.foreach(entry => configuration.set(entry.getKey, entry.getValue)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala index 02cc7e5..306f98b 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkExecuteStatementOperation.scala @@ -20,9 +20,9 @@ package org.apache.spark.sql.hive.thriftserver import java.security.PrivilegedExceptionAction import java.sql.{Date, Timestamp} import java.util.concurrent.RejectedExecutionException -import java.util.{Map => JMap, UUID} +import java.util.{Arrays, Map => JMap, UUID} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, Map => SMap} import scala.util.control.NonFatal @@ -126,13 +126,13 @@ private[hive] class SparkExecuteStatementOperation( def getResultSetSchema: TableSchema = { if (result == null || result.queryExecution.analyzed.output.size == 0) { - new TableSchema(new FieldSchema("Result", "string", "") :: Nil) + new TableSchema(Arrays.asList(new FieldSchema("Result", "string", ""))) } else { logInfo(s"Result Schema: ${result.queryExecution.analyzed.output}") val schema = result.queryExecution.analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } - new TableSchema(schema) + new TableSchema(schema.asJava) } } @@ -298,7 +298,7 @@ private[hive] class SparkExecuteStatementOperation( sqlOperationConf = new HiveConf(sqlOperationConf) // apply overlay query specific settings, if any - getConfOverlay().foreach { case (k, v) => + getConfOverlay().asScala.foreach { case (k, v) => try { sqlOperationConf.verifyAndSet(k, v) } catch { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala index 7799704..a29df56 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIDriver.scala @@ -17,11 +17,11 @@ package org.apache.spark.sql.hive.thriftserver -import scala.collection.JavaConversions._ - import java.io._ import java.util.{ArrayList => JArrayList, Locale} +import scala.collection.JavaConverters._ + import jline.console.ConsoleReader import jline.console.history.FileHistory @@ -101,9 +101,9 @@ private[hive] object SparkSQLCLIDriver extends Logging { // Set all properties specified via command line. val conf: HiveConf = sessionState.getConf - sessionState.cmdProperties.entrySet().foreach { item => - val key = item.getKey.asInstanceOf[String] - val value = item.getValue.asInstanceOf[String] + sessionState.cmdProperties.entrySet().asScala.foreach { item => + val key = item.getKey.toString + val value = item.getValue.toString // We do not propagate metastore options to the execution copy of hive. if (key != "javax.jdo.option.ConnectionURL") { conf.set(key, value) @@ -316,15 +316,15 @@ private[hive] class SparkSQLCLIDriver extends CliDriver with Logging { if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_CLI_PRINT_HEADER)) { // Print the column names. - Option(driver.getSchema.getFieldSchemas).map { fields => - out.println(fields.map(_.getName).mkString("\t")) + Option(driver.getSchema.getFieldSchemas).foreach { fields => + out.println(fields.asScala.map(_.getName).mkString("\t")) } } var counter = 0 try { while (!out.checkError() && driver.getResults(res)) { - res.foreach{ l => + res.asScala.foreach { l => counter += 1 out.println(l) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala index 644165a..5ad8c54 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLCLIService.scala @@ -21,6 +21,8 @@ import java.io.IOException import java.util.{List => JList} import javax.security.auth.login.LoginException +import scala.collection.JavaConverters._ + import org.apache.commons.logging.Log import org.apache.hadoop.hive.conf.HiveConf import org.apache.hadoop.hive.shims.Utils @@ -34,8 +36,6 @@ import org.apache.hive.service.{AbstractService, Service, ServiceException} import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.hive.thriftserver.ReflectionUtils._ -import scala.collection.JavaConversions._ - private[hive] class SparkSQLCLIService(hiveServer: HiveServer2, hiveContext: HiveContext) extends CLIService(hiveServer) with ReflectedCompositeService { @@ -76,7 +76,7 @@ private[thriftserver] trait ReflectedCompositeService { this: AbstractService => def initCompositeService(hiveConf: HiveConf) { // Emulating `CompositeService.init(hiveConf)` val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList") - serviceList.foreach(_.init(hiveConf)) + serviceList.asScala.foreach(_.init(hiveConf)) // Emulating `AbstractService.init(hiveConf)` invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala index 77272ae..2619286 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLDriver.scala @@ -17,7 +17,9 @@ package org.apache.spark.sql.hive.thriftserver -import java.util.{ArrayList => JArrayList, List => JList} +import java.util.{Arrays, ArrayList => JArrayList, List => JList} + +import scala.collection.JavaConverters._ import org.apache.commons.lang3.exception.ExceptionUtils import org.apache.hadoop.hive.metastore.api.{FieldSchema, Schema} @@ -27,8 +29,6 @@ import org.apache.hadoop.hive.ql.processors.CommandProcessorResponse import org.apache.spark.Logging import org.apache.spark.sql.hive.{HiveContext, HiveMetastoreTypes} -import scala.collection.JavaConversions._ - private[hive] class SparkSQLDriver( val context: HiveContext = SparkSQLEnv.hiveContext) extends Driver @@ -43,14 +43,14 @@ private[hive] class SparkSQLDriver( private def getResultSetSchema(query: context.QueryExecution): Schema = { val analyzed = query.analyzed logDebug(s"Result Schema: ${analyzed.output}") - if (analyzed.output.size == 0) { - new Schema(new FieldSchema("Response code", "string", "") :: Nil, null) + if (analyzed.output.isEmpty) { + new Schema(Arrays.asList(new FieldSchema("Response code", "string", "")), null) } else { val fieldSchemas = analyzed.output.map { attr => new FieldSchema(attr.name, HiveMetastoreTypes.toMetastoreType(attr.dataType), "") } - new Schema(fieldSchemas, null) + new Schema(fieldSchemas.asJava, null) } } @@ -79,7 +79,7 @@ private[hive] class SparkSQLDriver( if (hiveResponse == null) { false } else { - res.asInstanceOf[JArrayList[String]].addAll(hiveResponse) + res.asInstanceOf[JArrayList[String]].addAll(hiveResponse.asJava) hiveResponse = null true } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala ---------------------------------------------------------------------- diff --git a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala index 1d41c46..bacf6cc 100644 --- a/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala +++ b/sql/hive-thriftserver/src/main/scala/org/apache/spark/sql/hive/thriftserver/SparkSQLEnv.scala @@ -19,7 +19,7 @@ package org.apache.spark.sql.hive.thriftserver import java.io.PrintStream -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.scheduler.StatsReportListener import org.apache.spark.sql.hive.HiveContext @@ -64,7 +64,7 @@ private[hive] object SparkSQLEnv extends Logging { hiveContext.setConf("spark.sql.hive.version", HiveContext.hiveExecutionVersion) if (log.isDebugEnabled) { - hiveContext.hiveconf.getAllProperties.toSeq.sorted.foreach { case (k, v) => + hiveContext.hiveconf.getAllProperties.asScala.toSeq.sorted.foreach { case (k, v) => logDebug(s"HiveConf var: $k=$v") } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala index 17cc830..c0a458f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveContext.scala @@ -22,7 +22,7 @@ import java.net.{URL, URLClassLoader} import java.sql.Timestamp import java.util.concurrent.TimeUnit -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.HashMap import scala.language.implicitConversions import scala.concurrent.duration._ @@ -194,7 +194,7 @@ class HiveContext(sc: SparkContext) extends SQLContext(sc) with Logging { logInfo("defalt warehouse location is " + defaltWarehouseLocation) // `configure` goes second to override other settings. - val allConfig = metadataConf.iterator.map(e => e.getKey -> e.getValue).toMap ++ configure + val allConfig = metadataConf.asScala.map(e => e.getKey -> e.getValue).toMap ++ configure val isolatedLoader = if (hiveMetastoreJars == "builtin") { if (hiveExecutionVersion != hiveMetastoreVersion) { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala ---------------------------------------------------------------------- diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala index 64fffdb..cfe2bb0 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveInspectors.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.hive +import scala.collection.JavaConverters._ + import org.apache.hadoop.hive.common.`type`.{HiveDecimal, HiveVarchar} import org.apache.hadoop.hive.serde2.objectinspector.primitive._ import org.apache.hadoop.hive.serde2.objectinspector.{StructField => HiveStructField, _} @@ -31,9 +33,6 @@ import org.apache.spark.sql.types._ import org.apache.spark.sql.{AnalysisException, types} import org.apache.spark.unsafe.types.UTF8String -/* Implicit conversions */ -import scala.collection.JavaConversions._ - /** * 1. The Underlying data type in catalyst and in Hive * In catalyst: @@ -290,13 +289,13 @@ private[hive] trait HiveInspectors { DateTimeUtils.fromJavaDate(poi.getWritableConstantValue.get()) case mi: StandardConstantMapObjectInspector => // take the value from the map inspector object, rather than the input data - val map = mi.getWritableConstantValue - val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray - val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + val keyValues = mi.getWritableConstantValue.asScala.toSeq + val keys = keyValues.map(kv => unwrap(kv._1, mi.getMapKeyObjectInspector)).toArray + val values = keyValues.map(kv => unwrap(kv._2, mi.getMapValueObjectInspector)).toArray ArrayBasedMapData(keys, values) case li: StandardConstantListObjectInspector => // take the value from the list inspector object, rather than the input data - val values = li.getWritableConstantValue + val values = li.getWritableConstantValue.asScala .map(unwrap(_, li.getListElementObjectInspector)) .toArray new GenericArrayData(values) @@ -342,7 +341,7 @@ private[hive] trait HiveInspectors { case li: ListObjectInspector => Option(li.getList(data)) .map { l => - val values = l.map(unwrap(_, li.getListElementObjectInspector)).toArray + val values = l.asScala.map(unwrap(_, li.getListElementObjectInspector)).toArray new GenericArrayData(values) } .orNull @@ -351,15 +350,16 @@ private[hive] trait HiveInspectors { if (map == null) { null } else { - val keys = map.keysIterator.map(unwrap(_, mi.getMapKeyObjectInspector)).toArray - val values = map.valuesIterator.map(unwrap(_, mi.getMapValueObjectInspector)).toArray + val keyValues = map.asScala.toSeq + val keys = keyValues.map(kv => unwrap(kv._1, mi.getMapKeyObjectInspector)).toArray + val values = keyValues.map(kv => unwrap(kv._2, mi.getMapValueObjectInspector)).toArray ArrayBasedMapData(keys, values) } // currently, hive doesn't provide the ConstantStructObjectInspector case si: StructObjectInspector => val allRefs = si.getAllStructFieldRefs - InternalRow.fromSeq( - allRefs.map(r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector))) + InternalRow.fromSeq(allRefs.asScala.map( + r => unwrap(si.getStructFieldData(data, r), r.getFieldObjectInspector))) } @@ -403,14 +403,14 @@ private[hive] trait HiveInspectors { case soi: StandardStructObjectInspector => val schema = dataType.asInstanceOf[StructType] - val wrappers = soi.getAllStructFieldRefs.zip(schema.fields).map { case (ref, field) => - wrapperFor(ref.getFieldObjectInspector, field.dataType) + val wrappers = soi.getAllStructFieldRefs.asScala.zip(schema.fields).map { + case (ref, field) => wrapperFor(ref.getFieldObjectInspector, field.dataType) } (o: Any) => { if (o != null) { val struct = soi.create() val row = o.asInstanceOf[InternalRow] - soi.getAllStructFieldRefs.zip(wrappers).zipWithIndex.foreach { + soi.getAllStructFieldRefs.asScala.zip(wrappers).zipWithIndex.foreach { case ((field, wrapper), i) => soi.setStructFieldData(struct, field, wrapper(row.get(i, schema(i).dataType))) } @@ -537,7 +537,7 @@ private[hive] trait HiveInspectors { // 1. create the pojo (most likely) object val result = x.create() var i = 0 - while (i < fieldRefs.length) { + while (i < fieldRefs.size) { // 2. set the property for the pojo val tpe = structType(i).dataType x.setStructFieldData( @@ -552,9 +552,9 @@ private[hive] trait HiveInspectors { val fieldRefs = x.getAllStructFieldRefs val structType = dataType.asInstanceOf[StructType] val row = a.asInstanceOf[InternalRow] - val result = new java.util.ArrayList[AnyRef](fieldRefs.length) + val result = new java.util.ArrayList[AnyRef](fieldRefs.size) var i = 0 - while (i < fieldRefs.length) { + while (i < fieldRefs.size) { val tpe = structType(i).dataType result.add(wrap(row.get(i, tpe), fieldRefs.get(i).getFieldObjectInspector, tpe)) i += 1 @@ -712,10 +712,10 @@ private[hive] trait HiveInspectors { def inspectorToDataType(inspector: ObjectInspector): DataType = inspector match { case s: StructObjectInspector => - StructType(s.getAllStructFieldRefs.map(f => { + StructType(s.getAllStructFieldRefs.asScala.map(f => types.StructField( f.getFieldName, inspectorToDataType(f.getFieldObjectInspector), nullable = true) - })) + )) case l: ListObjectInspector => ArrayType(inspectorToDataType(l.getListElementObjectInspector)) case m: MapObjectInspector => MapType( --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org