[SPARK-9613] [CORE] Ban use of JavaConversions and migrate all existing uses to JavaConverters
Replace `JavaConversions` implicits with `JavaConverters` Most occurrences I've seen so far are necessary conversions; a few have been avoidable. None are in critical code as far as I see, yet. Author: Sean Owen <so...@cloudera.com> Closes #8033 from srowen/SPARK-9613. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/69c9c177 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/69c9c177 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/69c9c177 Branch: refs/heads/master Commit: 69c9c177160e32a2fbc9b36ecc52156077fca6fc Parents: 7f1e507 Author: Sean Owen <so...@cloudera.com> Authored: Tue Aug 25 12:33:13 2015 +0100 Committer: Sean Owen <so...@cloudera.com> Committed: Tue Aug 25 12:33:13 2015 +0100 ---------------------------------------------------------------------- .../shuffle/unsafe/UnsafeShuffleWriter.java | 4 +- .../org/apache/spark/MapOutputTracker.scala | 4 +- .../scala/org/apache/spark/SSLOptions.scala | 11 +- .../scala/org/apache/spark/SparkContext.scala | 4 +- .../main/scala/org/apache/spark/TestUtils.scala | 9 +- .../apache/spark/api/java/JavaHadoopRDD.scala | 4 +- .../spark/api/java/JavaNewHadoopRDD.scala | 4 +- .../org/apache/spark/api/java/JavaPairRDD.scala | 19 ++-- .../org/apache/spark/api/java/JavaRDDLike.scala | 75 +++++-------- .../spark/api/java/JavaSparkContext.scala | 20 ++-- .../spark/api/python/PythonHadoopUtil.scala | 28 ++--- .../org/apache/spark/api/python/PythonRDD.scala | 26 ++--- .../apache/spark/api/python/PythonUtils.scala | 15 ++- .../spark/api/python/PythonWorkerFactory.scala | 11 +- .../org/apache/spark/api/python/SerDeUtil.scala | 3 +- .../WriteInputFormatTestDataGenerator.scala | 8 +- .../scala/org/apache/spark/api/r/RRDD.scala | 13 ++- .../scala/org/apache/spark/api/r/RUtils.scala | 5 +- .../scala/org/apache/spark/api/r/SerDe.scala | 4 +- .../spark/broadcast/TorrentBroadcast.scala | 4 +- .../spark/deploy/ExternalShuffleService.scala | 8 +- .../org/apache/spark/deploy/PythonRunner.scala | 4 +- .../org/apache/spark/deploy/RPackageUtils.scala | 4 +- .../scala/org/apache/spark/deploy/RRunner.scala | 4 +- .../apache/spark/deploy/SparkCuratorUtil.scala | 4 +- .../apache/spark/deploy/SparkHadoopUtil.scala | 19 ++-- .../spark/deploy/SparkSubmitArguments.scala | 6 +- .../master/ZooKeeperPersistenceEngine.scala | 6 +- .../spark/deploy/worker/CommandUtils.scala | 5 +- .../spark/deploy/worker/DriverRunner.scala | 8 +- .../spark/deploy/worker/ExecutorRunner.scala | 7 +- .../org/apache/spark/deploy/worker/Worker.scala | 1 - .../org/apache/spark/executor/Executor.scala | 6 +- .../apache/spark/executor/ExecutorSource.scala | 4 +- .../spark/executor/MesosExecutorBackend.scala | 6 +- .../apache/spark/input/PortableDataStream.scala | 11 +- .../spark/input/WholeTextFileInputFormat.scala | 8 +- .../spark/launcher/WorkerCommandBuilder.scala | 4 +- .../apache/spark/metrics/MetricsConfig.scala | 22 ++-- .../network/netty/NettyBlockRpcServer.scala | 4 +- .../netty/NettyBlockTransferService.scala | 6 +- .../apache/spark/network/nio/Connection.scala | 4 +- .../spark/partial/GroupedCountEvaluator.scala | 10 +- .../spark/partial/GroupedMeanEvaluator.scala | 10 +- .../spark/partial/GroupedSumEvaluator.scala | 10 +- .../org/apache/spark/rdd/PairRDDFunctions.scala | 6 +- .../scala/org/apache/spark/rdd/PipedRDD.scala | 6 +- .../org/apache/spark/rdd/SubtractedRDD.scala | 4 +- .../spark/scheduler/InputFormatInfo.scala | 4 +- .../scala/org/apache/spark/scheduler/Pool.scala | 10 +- .../mesos/CoarseMesosSchedulerBackend.scala | 20 ++-- .../mesos/MesosClusterPersistenceEngine.scala | 4 +- .../cluster/mesos/MesosClusterScheduler.scala | 14 +-- .../cluster/mesos/MesosSchedulerBackend.scala | 22 ++-- .../cluster/mesos/MesosSchedulerUtils.scala | 25 ++--- .../spark/serializer/KryoSerializer.scala | 10 +- .../shuffle/FileShuffleBlockResolver.scala | 8 +- .../storage/BlockManagerMasterEndpoint.scala | 8 +- .../scala/org/apache/spark/util/AkkaUtils.scala | 4 +- .../org/apache/spark/util/ListenerBus.scala | 7 +- .../spark/util/MutableURLClassLoader.scala | 2 - .../apache/spark/util/TimeStampedHashMap.scala | 10 +- .../apache/spark/util/TimeStampedHashSet.scala | 4 +- .../scala/org/apache/spark/util/Utils.scala | 20 ++-- .../apache/spark/util/collection/Utils.scala | 4 +- .../java/org/apache/spark/JavaAPISuite.java | 6 +- .../scala/org/apache/spark/SparkConfSuite.scala | 7 +- .../spark/deploy/LogUrlsStandaloneSuite.scala | 1 - .../spark/deploy/RPackageUtilsSuite.scala | 8 +- .../deploy/worker/ExecutorRunnerTest.scala | 5 +- .../spark/scheduler/SparkListenerSuite.scala | 9 +- .../mesos/MesosSchedulerBackendSuite.scala | 15 +-- .../spark/serializer/KryoSerializerSuite.scala | 3 +- .../org/apache/spark/ui/UISeleniumSuite.scala | 21 ++-- .../spark/examples/CassandraCQLTest.scala | 15 +-- .../apache/spark/examples/CassandraTest.scala | 6 +- .../spark/examples/DriverSubmissionTest.scala | 6 +- .../pythonconverters/AvroConverters.scala | 16 +-- .../pythonconverters/CassandraConverters.scala | 14 ++- .../pythonconverters/HBaseConverters.scala | 5 +- .../streaming/flume/sink/SparkSinkSuite.scala | 4 +- .../streaming/flume/EventTransformer.scala | 4 +- .../streaming/flume/FlumeBatchFetcher.scala | 3 +- .../streaming/flume/FlumeInputDStream.scala | 7 +- .../flume/FlumePollingInputDStream.scala | 6 +- .../spark/streaming/flume/FlumeTestUtils.scala | 10 +- .../spark/streaming/flume/FlumeUtils.scala | 8 +- .../streaming/flume/PollingFlumeTestUtils.scala | 16 ++- .../flume/FlumePollingStreamSuite.scala | 8 +- .../streaming/flume/FlumeStreamSuite.scala | 2 +- .../spark/streaming/kafka/KafkaTestUtils.scala | 4 +- .../spark/streaming/kafka/KafkaUtils.scala | 35 +++--- .../spark/streaming/zeromq/ZeroMQUtils.scala | 15 ++- .../kinesis/KinesisBackedBlockRDD.scala | 4 +- .../streaming/kinesis/KinesisReceiver.scala | 4 +- .../streaming/kinesis/KinesisTestUtils.scala | 3 +- .../kinesis/KinesisReceiverSuite.scala | 12 +- .../spark/mllib/util/LinearDataGenerator.scala | 4 +- .../ml/classification/JavaOneVsRestSuite.java | 7 +- .../LogisticRegressionSuite.scala | 4 +- .../spark/mllib/classification/SVMSuite.scala | 4 +- .../optimization/GradientDescentSuite.scala | 4 +- .../spark/mllib/recommendation/ALSSuite.scala | 4 +- project/SparkBuild.scala | 8 +- python/pyspark/sql/column.py | 12 ++ python/pyspark/sql/dataframe.py | 4 +- scalastyle-config.xml | 7 ++ .../main/scala/org/apache/spark/sql/Row.scala | 12 +- .../spark/sql/catalyst/analysis/Catalog.scala | 4 +- .../apache/spark/sql/DataFrameNaFunctions.scala | 8 +- .../org/apache/spark/sql/DataFrameReader.scala | 4 +- .../org/apache/spark/sql/DataFrameWriter.scala | 4 +- .../org/apache/spark/sql/GroupedData.scala | 4 +- .../scala/org/apache/spark/sql/SQLConf.scala | 13 ++- .../scala/org/apache/spark/sql/SQLContext.scala | 8 +- .../datasources/ResolvedDataSource.scala | 4 +- .../parquet/CatalystReadSupport.scala | 8 +- .../parquet/CatalystRowConverter.scala | 4 +- .../parquet/CatalystSchemaConverter.scala | 4 +- .../datasources/parquet/ParquetRelation.scala | 13 ++- .../parquet/ParquetTypesConverter.scala | 4 +- .../execution/joins/ShuffledHashOuterJoin.scala | 6 +- .../apache/spark/sql/execution/pythonUDFs.scala | 11 +- .../apache/spark/sql/JavaDataFrameSuite.java | 8 +- .../spark/sql/DataFrameNaFunctionsSuite.scala | 6 +- .../scala/org/apache/spark/sql/QueryTest.scala | 4 +- .../parquet/ParquetAvroCompatibilitySuite.scala | 3 +- .../parquet/ParquetCompatibilityTest.scala | 7 +- .../datasources/parquet/ParquetIOSuite.scala | 25 +++-- .../SparkExecuteStatementOperation.scala | 10 +- .../hive/thriftserver/SparkSQLCLIDriver.scala | 16 +-- .../hive/thriftserver/SparkSQLCLIService.scala | 6 +- .../sql/hive/thriftserver/SparkSQLDriver.scala | 14 +-- .../sql/hive/thriftserver/SparkSQLEnv.scala | 4 +- .../org/apache/spark/sql/hive/HiveContext.scala | 4 +- .../apache/spark/sql/hive/HiveInspectors.scala | 40 +++---- .../spark/sql/hive/HiveMetastoreCatalog.scala | 12 +- .../org/apache/spark/sql/hive/HiveQl.scala | 110 +++++++++++-------- .../org/apache/spark/sql/hive/HiveShim.scala | 5 +- .../spark/sql/hive/client/ClientWrapper.scala | 27 ++--- .../apache/spark/sql/hive/client/HiveShim.scala | 14 +-- .../execution/DescribeHiveTableCommand.scala | 8 +- .../sql/hive/execution/HiveTableScan.scala | 9 +- .../hive/execution/InsertIntoHiveTable.scala | 12 +- .../hive/execution/ScriptTransformation.scala | 12 +- .../org/apache/spark/sql/hive/hiveUDFs.scala | 9 +- .../apache/spark/sql/hive/orc/OrcRelation.scala | 8 +- .../apache/spark/sql/hive/test/TestHive.scala | 11 +- .../spark/sql/hive/client/FiltersSuite.scala | 4 +- .../spark/sql/hive/execution/HiveUDFSuite.scala | 29 +++-- .../spark/sql/hive/execution/PruningSuite.scala | 7 +- .../sql/hive/execution/SQLQuerySuite.scala | 4 +- .../sql/sources/hadoopFsRelationSuites.scala | 8 +- .../streaming/api/java/JavaDStreamLike.scala | 12 +- .../streaming/api/java/JavaPairDStream.scala | 28 ++--- .../api/java/JavaStreamingContext.scala | 32 +++--- .../streaming/api/python/PythonDStream.scala | 5 +- .../spark/streaming/receiver/Receiver.scala | 6 +- .../streaming/scheduler/JobScheduler.scala | 4 +- .../scheduler/ReceivedBlockTracker.scala | 4 +- .../streaming/util/FileBasedWriteAheadLog.scala | 4 +- .../apache/spark/streaming/JavaTestUtils.scala | 24 ++-- .../streaming/util/WriteAheadLogSuite.scala | 4 +- .../apache/spark/tools/GenerateMIMAIgnore.scala | 6 +- .../org/apache/spark/deploy/yarn/Client.scala | 13 ++- .../spark/deploy/yarn/ExecutorRunnable.scala | 24 ++-- .../spark/deploy/yarn/YarnAllocator.scala | 19 ++-- .../apache/spark/deploy/yarn/YarnRMClient.scala | 8 +- .../deploy/yarn/BaseYarnClusterSuite.scala | 6 +- .../apache/spark/deploy/yarn/ClientSuite.scala | 8 +- .../spark/deploy/yarn/YarnClusterSuite.scala | 5 +- 171 files changed, 863 insertions(+), 880 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java index 2389c28..fdb309e 100644 --- a/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java +++ b/core/src/main/java/org/apache/spark/shuffle/unsafe/UnsafeShuffleWriter.java @@ -24,7 +24,7 @@ import java.util.Iterator; import scala.Option; import scala.Product2; -import scala.collection.JavaConversions; +import scala.collection.JavaConverters; import scala.collection.immutable.Map; import scala.reflect.ClassTag; import scala.reflect.ClassTag$; @@ -160,7 +160,7 @@ public class UnsafeShuffleWriter<K, V> extends ShuffleWriter<K, V> { */ @VisibleForTesting public void write(Iterator<Product2<K, V>> records) throws IOException { - write(JavaConversions.asScalaIterator(records)); + write(JavaConverters.asScalaIteratorConverter(records).asScala()); } @Override http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/MapOutputTracker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index 9221883..a387592 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -21,8 +21,8 @@ import java.io._ import java.util.concurrent.ConcurrentHashMap import java.util.zip.{GZIPInputStream, GZIPOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet, Map} -import scala.collection.JavaConversions._ import scala.reflect.ClassTag import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, RpcEndpoint} @@ -398,7 +398,7 @@ private[spark] class MapOutputTrackerMaster(conf: SparkConf) */ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTracker(conf) { protected val mapStatuses: Map[Int, Array[MapStatus]] = - new ConcurrentHashMap[Int, Array[MapStatus]] + new ConcurrentHashMap[Int, Array[MapStatus]]().asScala } private[spark] object MapOutputTracker extends Logging { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/SSLOptions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SSLOptions.scala b/core/src/main/scala/org/apache/spark/SSLOptions.scala index 32df42d..3b9c885 100644 --- a/core/src/main/scala/org/apache/spark/SSLOptions.scala +++ b/core/src/main/scala/org/apache/spark/SSLOptions.scala @@ -17,9 +17,11 @@ package org.apache.spark -import java.io.{File, FileInputStream} -import java.security.{KeyStore, NoSuchAlgorithmException} -import javax.net.ssl.{KeyManager, KeyManagerFactory, SSLContext, TrustManager, TrustManagerFactory} +import java.io.File +import java.security.NoSuchAlgorithmException +import javax.net.ssl.SSLContext + +import scala.collection.JavaConverters._ import com.typesafe.config.{Config, ConfigFactory, ConfigValueFactory} import org.eclipse.jetty.util.ssl.SslContextFactory @@ -79,7 +81,6 @@ private[spark] case class SSLOptions( * object. It can be used then to compose the ultimate Akka configuration. */ def createAkkaConfig: Option[Config] = { - import scala.collection.JavaConversions._ if (enabled) { Some(ConfigFactory.empty() .withValue("akka.remote.netty.tcp.security.key-store", @@ -97,7 +98,7 @@ private[spark] case class SSLOptions( .withValue("akka.remote.netty.tcp.security.protocol", ConfigValueFactory.fromAnyRef(protocol.getOrElse(""))) .withValue("akka.remote.netty.tcp.security.enabled-algorithms", - ConfigValueFactory.fromIterable(supportedAlgorithms.toSeq)) + ConfigValueFactory.fromIterable(supportedAlgorithms.asJava)) .withValue("akka.remote.netty.tcp.enable-ssl", ConfigValueFactory.fromAnyRef(true))) } else { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/SparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 9849aff..f3da04a 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -26,8 +26,8 @@ import java.util.{Arrays, Properties, UUID} import java.util.concurrent.atomic.{AtomicReference, AtomicBoolean, AtomicInteger} import java.util.UUID.randomUUID +import scala.collection.JavaConverters._ import scala.collection.{Map, Set} -import scala.collection.JavaConversions._ import scala.collection.generic.Growable import scala.collection.mutable.HashMap import scala.reflect.{ClassTag, classTag} @@ -1546,7 +1546,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli def getAllPools: Seq[Schedulable] = { assertNotStopped() // TODO(xiajunluan): We should take nested pools into account - taskScheduler.rootPool.schedulableQueue.toSeq + taskScheduler.rootPool.schedulableQueue.asScala.toSeq } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/TestUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/TestUtils.scala b/core/src/main/scala/org/apache/spark/TestUtils.scala index a1ebbec..888763a 100644 --- a/core/src/main/scala/org/apache/spark/TestUtils.scala +++ b/core/src/main/scala/org/apache/spark/TestUtils.scala @@ -19,11 +19,12 @@ package org.apache.spark import java.io.{ByteArrayInputStream, File, FileInputStream, FileOutputStream} import java.net.{URI, URL} +import java.nio.charset.StandardCharsets +import java.util.Arrays import java.util.jar.{JarEntry, JarOutputStream} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ -import com.google.common.base.Charsets.UTF_8 import com.google.common.io.{ByteStreams, Files} import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider} @@ -71,7 +72,7 @@ private[spark] object TestUtils { files.foreach { case (k, v) => val entry = new JarEntry(k) jarStream.putNextEntry(entry) - ByteStreams.copy(new ByteArrayInputStream(v.getBytes(UTF_8)), jarStream) + ByteStreams.copy(new ByteArrayInputStream(v.getBytes(StandardCharsets.UTF_8)), jarStream) } jarStream.close() jarFile.toURI.toURL @@ -125,7 +126,7 @@ private[spark] object TestUtils { } else { Seq() } - compiler.getTask(null, null, null, options, null, Seq(sourceFile)).call() + compiler.getTask(null, null, null, options.asJava, null, Arrays.asList(sourceFile)).call() val fileName = className + ".class" val result = new File(fileName) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala index 0ae0b4e..891bcdd 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaHadoopRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.mapred.InputSplit @@ -37,7 +37,7 @@ class JavaHadoopRDD[K, V](rdd: HadoopRDD[K, V]) def mapPartitionsWithInputSplit[R]( f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = { - new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)), + new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala, preservesPartitioning)(fakeClassTag))(fakeClassTag) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala index ec4f396..0f49279 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaNewHadoopRDD.scala @@ -17,7 +17,7 @@ package org.apache.spark.api.java -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.hadoop.mapreduce.InputSplit @@ -37,7 +37,7 @@ class JavaNewHadoopRDD[K, V](rdd: NewHadoopRDD[K, V]) def mapPartitionsWithInputSplit[R]( f: JFunction2[InputSplit, java.util.Iterator[(K, V)], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = { - new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, asJavaIterator(b)), + new JavaRDD(rdd.mapPartitionsWithInputSplit((a, b) => f.call(a, b.asJava).asScala, preservesPartitioning)(fakeClassTag))(fakeClassTag) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala index 8441bb3..fb78797 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala @@ -20,7 +20,7 @@ package org.apache.spark.api.java import java.util.{Comparator, List => JList, Map => JMap} import java.lang.{Iterable => JIterable} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -142,7 +142,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def sampleByKey(withReplacement: Boolean, fractions: JMap[K, Double], seed: Long): JavaPairRDD[K, V] = - new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed)) + new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions.asScala, seed)) /** * Return a subset of this RDD sampled by key (via stratified sampling). @@ -173,7 +173,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) def sampleByKeyExact(withReplacement: Boolean, fractions: JMap[K, Double], seed: Long): JavaPairRDD[K, V] = - new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed)) + new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions.asScala, seed)) /** * ::Experimental:: @@ -768,7 +768,7 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) * Return the list of values in the RDD for key `key`. This operation is done efficiently if the * RDD has a known partitioner by only searching the partition that the key maps to. */ - def lookup(key: K): JList[V] = seqAsJavaList(rdd.lookup(key)) + def lookup(key: K): JList[V] = rdd.lookup(key).asJava /** Output the RDD to any Hadoop-supported file system. */ def saveAsHadoopFile[F <: OutputFormat[_, _]]( @@ -987,30 +987,27 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)]) object JavaPairRDD { private[spark] def groupByResultToJava[K: ClassTag, T](rdd: RDD[(K, Iterable[T])]): RDD[(K, JIterable[T])] = { - rddToPairRDDFunctions(rdd).mapValues(asJavaIterable) + rddToPairRDDFunctions(rdd).mapValues(_.asJava) } private[spark] def cogroupResultToJava[K: ClassTag, V, W]( rdd: RDD[(K, (Iterable[V], Iterable[W]))]): RDD[(K, (JIterable[V], JIterable[W]))] = { - rddToPairRDDFunctions(rdd).mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2))) + rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava)) } private[spark] def cogroupResult2ToJava[K: ClassTag, V, W1, W2]( rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2]))]) : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2]))] = { - rddToPairRDDFunctions(rdd) - .mapValues(x => (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3))) + rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava)) } private[spark] def cogroupResult3ToJava[K: ClassTag, V, W1, W2, W3]( rdd: RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]) : RDD[(K, (JIterable[V], JIterable[W1], JIterable[W2], JIterable[W3]))] = { - rddToPairRDDFunctions(rdd) - .mapValues(x => - (asJavaIterable(x._1), asJavaIterable(x._2), asJavaIterable(x._3), asJavaIterable(x._4))) + rddToPairRDDFunctions(rdd).mapValues(x => (x._1.asJava, x._2.asJava, x._3.asJava, x._4.asJava)) } def fromRDD[K: ClassTag, V: ClassTag](rdd: RDD[(K, V)]): JavaPairRDD[K, V] = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala index c582488..fc817cd 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaRDDLike.scala @@ -21,7 +21,6 @@ import java.{lang => jl} import java.lang.{Iterable => JIterable, Long => JLong} import java.util.{Comparator, List => JList, Iterator => JIterator} -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.reflect.ClassTag @@ -59,10 +58,10 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def rdd: RDD[T] @deprecated("Use partitions() instead.", "1.1.0") - def splits: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) + def splits: JList[Partition] = rdd.partitions.toSeq.asJava /** Set of partitions in this RDD. */ - def partitions: JList[Partition] = new java.util.ArrayList(rdd.partitions.toSeq) + def partitions: JList[Partition] = rdd.partitions.toSeq.asJava /** The partitioner of this RDD. */ def partitioner: Optional[Partitioner] = JavaUtils.optionToOptional(rdd.partitioner) @@ -82,7 +81,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * subclasses of RDD. */ def iterator(split: Partition, taskContext: TaskContext): java.util.Iterator[T] = - asJavaIterator(rdd.iterator(split, taskContext)) + rdd.iterator(split, taskContext).asJava // Transformations (return a new RDD) @@ -99,7 +98,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsWithIndex[R]( f: JFunction2[jl.Integer, java.util.Iterator[T], java.util.Iterator[R]], preservesPartitioning: Boolean = false): JavaRDD[R] = - new JavaRDD(rdd.mapPartitionsWithIndex(((a, b) => f(a, asJavaIterator(b))), + new JavaRDD(rdd.mapPartitionsWithIndex((a, b) => f.call(a, b.asJava).asScala, preservesPartitioning)(fakeClassTag))(fakeClassTag) /** @@ -153,7 +152,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U]): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } JavaRDD.fromRDD(rdd.mapPartitions(fn)(fakeClassTag[U]))(fakeClassTag[U]) } @@ -164,7 +163,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitions[U](f: FlatMapFunction[java.util.Iterator[T], U], preservesPartitioning: Boolean): JavaRDD[U] = { def fn: (Iterator[T]) => Iterator[U] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } JavaRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning)(fakeClassTag[U]))(fakeClassTag[U]) @@ -175,7 +174,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { */ def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]]): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } new JavaDoubleRDD(rdd.mapPartitions(fn).map((x: jl.Double) => x.doubleValue())) } @@ -186,7 +185,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2]): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } JavaPairRDD.fromRDD(rdd.mapPartitions(fn))(fakeClassTag[K2], fakeClassTag[V2]) } @@ -197,7 +196,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToDouble(f: DoubleFlatMapFunction[java.util.Iterator[T]], preservesPartitioning: Boolean): JavaDoubleRDD = { def fn: (Iterator[T]) => Iterator[jl.Double] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } new JavaDoubleRDD(rdd.mapPartitions(fn, preservesPartitioning) .map(x => x.doubleValue())) @@ -209,7 +208,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def mapPartitionsToPair[K2, V2](f: PairFlatMapFunction[java.util.Iterator[T], K2, V2], preservesPartitioning: Boolean): JavaPairRDD[K2, V2] = { def fn: (Iterator[T]) => Iterator[(K2, V2)] = { - (x: Iterator[T]) => asScalaIterator(f.call(asJavaIterator(x)).iterator()) + (x: Iterator[T]) => f.call(x.asJava).iterator().asScala } JavaPairRDD.fromRDD( rdd.mapPartitions(fn, preservesPartitioning))(fakeClassTag[K2], fakeClassTag[V2]) @@ -219,14 +218,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Applies a function f to each partition of this RDD. */ def foreachPartition(f: VoidFunction[java.util.Iterator[T]]) { - rdd.foreachPartition((x => f.call(asJavaIterator(x)))) + rdd.foreachPartition((x => f.call(x.asJava))) } /** * Return an RDD created by coalescing all elements within each partition into an array. */ def glom(): JavaRDD[JList[T]] = - new JavaRDD(rdd.glom().map(x => new java.util.ArrayList[T](x.toSeq))) + new JavaRDD(rdd.glom().map(_.toSeq.asJava)) /** * Return the Cartesian product of this RDD and another one, that is, the RDD of all pairs of @@ -266,13 +265,13 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String]): JavaRDD[String] = - rdd.pipe(asScalaBuffer(command)) + rdd.pipe(command.asScala) /** * Return an RDD created by piping elements to a forked external process. */ def pipe(command: JList[String], env: java.util.Map[String, String]): JavaRDD[String] = - rdd.pipe(asScalaBuffer(command), mapAsScalaMap(env)) + rdd.pipe(command.asScala, env.asScala) /** * Zips this RDD with another one, returning key-value pairs with the first element in each RDD, @@ -294,8 +293,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { other: JavaRDDLike[U, _], f: FlatMapFunction2[java.util.Iterator[T], java.util.Iterator[U], V]): JavaRDD[V] = { def fn: (Iterator[T], Iterator[U]) => Iterator[V] = { - (x: Iterator[T], y: Iterator[U]) => asScalaIterator( - f.call(asJavaIterator(x), asJavaIterator(y)).iterator()) + (x: Iterator[T], y: Iterator[U]) => f.call(x.asJava, y.asJava).iterator().asScala } JavaRDD.fromRDD( rdd.zipPartitions(other.rdd)(fn)(other.classTag, fakeClassTag[V]))(fakeClassTag[V]) @@ -333,22 +331,16 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { /** * Return an array that contains all of the elements in this RDD. */ - def collect(): JList[T] = { - import scala.collection.JavaConversions._ - val arr: java.util.Collection[T] = rdd.collect().toSeq - new java.util.ArrayList(arr) - } + def collect(): JList[T] = + rdd.collect().toSeq.asJava /** * Return an iterator that contains all of the elements in this RDD. * * The iterator will consume as much memory as the largest partition in this RDD. */ - def toLocalIterator(): JIterator[T] = { - import scala.collection.JavaConversions._ - rdd.toLocalIterator - } - + def toLocalIterator(): JIterator[T] = + asJavaIteratorConverter(rdd.toLocalIterator).asJava /** * Return an array that contains all of the elements in this RDD. @@ -363,9 +355,8 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { def collectPartitions(partitionIds: Array[Int]): Array[JList[T]] = { // This is useful for implementing `take` from other language frontends // like Python where the data is serialized. - import scala.collection.JavaConversions._ val res = context.runJob(rdd, (it: Iterator[T]) => it.toArray, partitionIds) - res.map(x => new java.util.ArrayList(x.toSeq)).toArray + res.map(_.toSeq.asJava) } /** @@ -489,20 +480,14 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * it will be slow if a lot of partitions are required. In that case, use collect() to get the * whole RDD instead. */ - def take(num: Int): JList[T] = { - import scala.collection.JavaConversions._ - val arr: java.util.Collection[T] = rdd.take(num).toSeq - new java.util.ArrayList(arr) - } + def take(num: Int): JList[T] = + rdd.take(num).toSeq.asJava def takeSample(withReplacement: Boolean, num: Int): JList[T] = takeSample(withReplacement, num, Utils.random.nextLong) - def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = { - import scala.collection.JavaConversions._ - val arr: java.util.Collection[T] = rdd.takeSample(withReplacement, num, seed).toSeq - new java.util.ArrayList(arr) - } + def takeSample(withReplacement: Boolean, num: Int, seed: Long): JList[T] = + rdd.takeSample(withReplacement, num, seed).toSeq.asJava /** * Return the first element in this RDD. @@ -582,10 +567,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * @return an array of top elements */ def top(num: Int, comp: Comparator[T]): JList[T] = { - import scala.collection.JavaConversions._ - val topElems = rdd.top(num)(Ordering.comparatorToOrdering(comp)) - val arr: java.util.Collection[T] = topElems.toSeq - new java.util.ArrayList(arr) + rdd.top(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava } /** @@ -607,10 +589,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * @return an array of top elements */ def takeOrdered(num: Int, comp: Comparator[T]): JList[T] = { - import scala.collection.JavaConversions._ - val topElems = rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)) - val arr: java.util.Collection[T] = topElems.toSeq - new java.util.ArrayList(arr) + rdd.takeOrdered(num)(Ordering.comparatorToOrdering(comp)).toSeq.asJava } /** @@ -696,7 +675,7 @@ trait JavaRDDLike[T, This <: JavaRDDLike[T, This]] extends Serializable { * applies a function f to each partition of this RDD. */ def foreachPartitionAsync(f: VoidFunction[java.util.Iterator[T]]): JavaFutureAction[Void] = { - new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x)), + new JavaFutureActionWrapper[Unit, Void](rdd.foreachPartitionAsync(x => f.call(x.asJava)), { x => null.asInstanceOf[Void] }) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala index 02e49a8..609496c 100644 --- a/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala +++ b/core/src/main/scala/org/apache/spark/api/java/JavaSparkContext.scala @@ -21,8 +21,7 @@ import java.io.Closeable import java.util import java.util.{Map => JMap} -import scala.collection.JavaConversions -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.language.implicitConversions import scala.reflect.ClassTag @@ -104,7 +103,7 @@ class JavaSparkContext(val sc: SparkContext) */ def this(master: String, appName: String, sparkHome: String, jars: Array[String], environment: JMap[String, String]) = - this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment, Map())) + this(new SparkContext(master, appName, sparkHome, jars.toSeq, environment.asScala, Map())) private[spark] val env = sc.env @@ -118,7 +117,7 @@ class JavaSparkContext(val sc: SparkContext) def appName: String = sc.appName - def jars: util.List[String] = sc.jars + def jars: util.List[String] = sc.jars.asJava def startTime: java.lang.Long = sc.startTime @@ -142,7 +141,7 @@ class JavaSparkContext(val sc: SparkContext) /** Distribute a local Scala collection to form an RDD. */ def parallelize[T](list: java.util.List[T], numSlices: Int): JavaRDD[T] = { implicit val ctag: ClassTag[T] = fakeClassTag - sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices) + sc.parallelize(list.asScala, numSlices) } /** Get an RDD that has no partitions or elements. */ @@ -161,7 +160,7 @@ class JavaSparkContext(val sc: SparkContext) : JavaPairRDD[K, V] = { implicit val ctagK: ClassTag[K] = fakeClassTag implicit val ctagV: ClassTag[V] = fakeClassTag - JavaPairRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list), numSlices)) + JavaPairRDD.fromRDD(sc.parallelize(list.asScala, numSlices)) } /** Distribute a local Scala collection to form an RDD. */ @@ -170,8 +169,7 @@ class JavaSparkContext(val sc: SparkContext) /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double], numSlices: Int): JavaDoubleRDD = - JavaDoubleRDD.fromRDD(sc.parallelize(JavaConversions.asScalaBuffer(list).map(_.doubleValue()), - numSlices)) + JavaDoubleRDD.fromRDD(sc.parallelize(list.asScala.map(_.doubleValue()), numSlices)) /** Distribute a local Scala collection to form an RDD. */ def parallelizeDoubles(list: java.util.List[java.lang.Double]): JavaDoubleRDD = @@ -519,7 +517,7 @@ class JavaSparkContext(val sc: SparkContext) /** Build the union of two or more RDDs. */ override def union[T](first: JavaRDD[T], rest: java.util.List[JavaRDD[T]]): JavaRDD[T] = { - val rdds: Seq[RDD[T]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) + val rdds: Seq[RDD[T]] = (Seq(first) ++ rest.asScala).map(_.rdd) implicit val ctag: ClassTag[T] = first.classTag sc.union(rdds) } @@ -527,7 +525,7 @@ class JavaSparkContext(val sc: SparkContext) /** Build the union of two or more RDDs. */ override def union[K, V](first: JavaPairRDD[K, V], rest: java.util.List[JavaPairRDD[K, V]]) : JavaPairRDD[K, V] = { - val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.rdd) + val rdds: Seq[RDD[(K, V)]] = (Seq(first) ++ rest.asScala).map(_.rdd) implicit val ctag: ClassTag[(K, V)] = first.classTag implicit val ctagK: ClassTag[K] = first.kClassTag implicit val ctagV: ClassTag[V] = first.vClassTag @@ -536,7 +534,7 @@ class JavaSparkContext(val sc: SparkContext) /** Build the union of two or more RDDs. */ override def union(first: JavaDoubleRDD, rest: java.util.List[JavaDoubleRDD]): JavaDoubleRDD = { - val rdds: Seq[RDD[Double]] = (Seq(first) ++ asScalaBuffer(rest)).map(_.srdd) + val rdds: Seq[RDD[Double]] = (Seq(first) ++ rest.asScala).map(_.srdd) new JavaDoubleRDD(sc.union(rdds)) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala index b959b68..a7dfa1d 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonHadoopUtil.scala @@ -17,15 +17,17 @@ package org.apache.spark.api.python -import org.apache.spark.broadcast.Broadcast -import org.apache.spark.rdd.RDD -import org.apache.spark.util.{SerializableConfiguration, Utils} -import org.apache.spark.{Logging, SparkException} +import scala.collection.JavaConverters._ +import scala.util.{Failure, Success, Try} + import org.apache.hadoop.conf.Configuration import org.apache.hadoop.io._ -import scala.util.{Failure, Success, Try} -import org.apache.spark.annotation.Experimental +import org.apache.spark.{Logging, SparkException} +import org.apache.spark.annotation.Experimental +import org.apache.spark.broadcast.Broadcast +import org.apache.spark.rdd.RDD +import org.apache.spark.util.{SerializableConfiguration, Utils} /** * :: Experimental :: @@ -68,7 +70,6 @@ private[python] class WritableToJavaConverter( * object representation */ private def convertWritable(writable: Writable): Any = { - import collection.JavaConversions._ writable match { case iw: IntWritable => iw.get() case dw: DoubleWritable => dw.get() @@ -89,9 +90,7 @@ private[python] class WritableToJavaConverter( aw.get().map(convertWritable(_)) case mw: MapWritable => val map = new java.util.HashMap[Any, Any]() - mw.foreach { case (k, v) => - map.put(convertWritable(k), convertWritable(v)) - } + mw.asScala.foreach { case (k, v) => map.put(convertWritable(k), convertWritable(v)) } map case w: Writable => WritableUtils.clone(w, conf.value.value) case other => other @@ -122,7 +121,6 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] { * supported out-of-the-box. */ private def convertToWritable(obj: Any): Writable = { - import collection.JavaConversions._ obj match { case i: java.lang.Integer => new IntWritable(i) case d: java.lang.Double => new DoubleWritable(d) @@ -134,7 +132,7 @@ private[python] class JavaToWritableConverter extends Converter[Any, Writable] { case null => NullWritable.get() case map: java.util.Map[_, _] => val mapWritable = new MapWritable() - map.foreach { case (k, v) => + map.asScala.foreach { case (k, v) => mapWritable.put(convertToWritable(k), convertToWritable(v)) } mapWritable @@ -161,9 +159,8 @@ private[python] object PythonHadoopUtil { * Convert a [[java.util.Map]] of properties to a [[org.apache.hadoop.conf.Configuration]] */ def mapToConf(map: java.util.Map[String, String]): Configuration = { - import collection.JavaConversions._ val conf = new Configuration() - map.foreach{ case (k, v) => conf.set(k, v) } + map.asScala.foreach { case (k, v) => conf.set(k, v) } conf } @@ -172,9 +169,8 @@ private[python] object PythonHadoopUtil { * any matching keys in left */ def mergeConfs(left: Configuration, right: Configuration): Configuration = { - import collection.JavaConversions._ val copy = new Configuration(left) - right.iterator().foreach(entry => copy.set(entry.getKey, entry.getValue)) + right.asScala.foreach(entry => copy.set(entry.getKey, entry.getValue)) copy } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 2a56bf2..b4d152b 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -21,7 +21,7 @@ import java.io._ import java.net._ import java.util.{Collections, ArrayList => JArrayList, List => JList, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.language.existentials @@ -66,11 +66,11 @@ private[spark] class PythonRDD( val env = SparkEnv.get val localdir = env.blockManager.diskBlockManager.localDirs.map( f => f.getPath()).mkString(",") - envVars += ("SPARK_LOCAL_DIRS" -> localdir) // it's also used in monitor thread + envVars.put("SPARK_LOCAL_DIRS", localdir) // it's also used in monitor thread if (reuse_worker) { - envVars += ("SPARK_REUSE_WORKER" -> "1") + envVars.put("SPARK_REUSE_WORKER", "1") } - val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + val worker: Socket = env.createPythonWorker(pythonExec, envVars.asScala.toMap) // Whether is the worker released into idle pool @volatile var released = false @@ -150,7 +150,7 @@ private[spark] class PythonRDD( // Check whether the worker is ready to be re-used. if (stream.readInt() == SpecialLengths.END_OF_STREAM) { if (reuse_worker) { - env.releasePythonWorker(pythonExec, envVars.toMap, worker) + env.releasePythonWorker(pythonExec, envVars.asScala.toMap, worker) released = true } } @@ -217,13 +217,13 @@ private[spark] class PythonRDD( // sparkFilesDir PythonRDD.writeUTF(SparkFiles.getRootDirectory, dataOut) // Python includes (*.zip and *.egg files) - dataOut.writeInt(pythonIncludes.length) - for (include <- pythonIncludes) { + dataOut.writeInt(pythonIncludes.size()) + for (include <- pythonIncludes.asScala) { PythonRDD.writeUTF(include, dataOut) } // Broadcast variables val oldBids = PythonRDD.getWorkerBroadcasts(worker) - val newBids = broadcastVars.map(_.id).toSet + val newBids = broadcastVars.asScala.map(_.id).toSet // number of different broadcasts val toRemove = oldBids.diff(newBids) val cnt = toRemove.size + newBids.diff(oldBids).size @@ -233,7 +233,7 @@ private[spark] class PythonRDD( dataOut.writeLong(- bid - 1) // bid >= 0 oldBids.remove(bid) } - for (broadcast <- broadcastVars) { + for (broadcast <- broadcastVars.asScala) { if (!oldBids.contains(broadcast.id)) { // send new broadcast dataOut.writeLong(broadcast.id) @@ -287,7 +287,7 @@ private[spark] class PythonRDD( if (!context.isCompleted) { try { logWarning("Incomplete task interrupted: Attempting to kill Python Worker") - env.destroyPythonWorker(pythonExec, envVars.toMap, worker) + env.destroyPythonWorker(pythonExec, envVars.asScala.toMap, worker) } catch { case e: Exception => logError("Exception when trying to kill worker", e) @@ -358,10 +358,10 @@ private[spark] object PythonRDD extends Logging { type ByteArray = Array[Byte] type UnrolledPartition = Array[ByteArray] val allPartitions: Array[UnrolledPartition] = - sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions) + sc.runJob(rdd, (x: Iterator[ByteArray]) => x.toArray, partitions.asScala) val flattenedPartition: UnrolledPartition = Array.concat(allPartitions: _*) serveIterator(flattenedPartition.iterator, - s"serve RDD ${rdd.id} with partitions ${partitions.mkString(",")}") + s"serve RDD ${rdd.id} with partitions ${partitions.asScala.mkString(",")}") } /** @@ -819,7 +819,7 @@ private class PythonAccumulatorParam(@transient serverHost: String, serverPort: val in = socket.getInputStream val out = new DataOutputStream(new BufferedOutputStream(socket.getOutputStream, bufferSize)) out.writeInt(val2.size) - for (array <- val2) { + for (array <- val2.asScala) { out.writeInt(array.length) out.write(array) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala index 90dacae..31e534f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala @@ -17,10 +17,10 @@ package org.apache.spark.api.python -import java.io.{File} +import java.io.File import java.util.{List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.SparkContext @@ -51,7 +51,14 @@ private[spark] object PythonUtils { * Convert list of T into seq of T (for calling API with varargs) */ def toSeq[T](vs: JList[T]): Seq[T] = { - vs.toList.toSeq + vs.asScala + } + + /** + * Convert list of T into a (Scala) List of T + */ + def toList[T](vs: JList[T]): List[T] = { + vs.asScala.toList } /** @@ -65,6 +72,6 @@ private[spark] object PythonUtils { * Convert java map of K, V into Map of K, V (for calling API with varargs) */ def toScalaMap[K, V](jm: java.util.Map[K, V]): Map[K, V] = { - jm.toMap + jm.asScala.toMap } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala index e314408..7039b73 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonWorkerFactory.scala @@ -19,9 +19,10 @@ package org.apache.spark.api.python import java.io.{DataOutputStream, DataInputStream, InputStream, OutputStreamWriter} import java.net.{InetAddress, ServerSocket, Socket, SocketException} +import java.util.Arrays import scala.collection.mutable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark._ import org.apache.spark.util.{RedirectThread, Utils} @@ -108,9 +109,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String serverSocket = new ServerSocket(0, 1, InetAddress.getByAddress(Array(127, 0, 0, 1))) // Create and start the worker - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.worker")) + val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.worker")) val workerEnv = pb.environment() - workerEnv.putAll(envVars) + workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") @@ -151,9 +152,9 @@ private[spark] class PythonWorkerFactory(pythonExec: String, envVars: Map[String try { // Create and start the daemon - val pb = new ProcessBuilder(Seq(pythonExec, "-m", "pyspark.daemon")) + val pb = new ProcessBuilder(Arrays.asList(pythonExec, "-m", "pyspark.daemon")) val workerEnv = pb.environment() - workerEnv.putAll(envVars) + workerEnv.putAll(envVars.asJava) workerEnv.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: workerEnv.put("PYTHONUNBUFFERED", "YES") http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala index 1f1debc..fd27276 100644 --- a/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala +++ b/core/src/main/scala/org/apache/spark/api/python/SerDeUtil.scala @@ -22,7 +22,6 @@ import java.util.{ArrayList => JArrayList} import org.apache.spark.api.java.JavaRDD -import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.Failure @@ -214,7 +213,7 @@ private[spark] object SerDeUtil extends Logging { new AutoBatchedPickler(cleaned) } else { val pickle = new Pickler - cleaned.grouped(batchSize).map(batched => pickle.dumps(seqAsJavaList(batched))) + cleaned.grouped(batchSize).map(batched => pickle.dumps(batched.asJava)) } } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala index 8f30ff9..ee1fb05 100644 --- a/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala +++ b/core/src/main/scala/org/apache/spark/api/python/WriteInputFormatTestDataGenerator.scala @@ -20,6 +20,8 @@ package org.apache.spark.api.python import java.io.{DataOutput, DataInput} import java.{util => ju} +import scala.collection.JavaConverters._ + import com.google.common.base.Charsets.UTF_8 import org.apache.hadoop.io._ @@ -62,10 +64,9 @@ private[python] class TestInputKeyConverter extends Converter[Any, Any] { } private[python] class TestInputValueConverter extends Converter[Any, Any] { - import collection.JavaConversions._ override def convert(obj: Any): ju.List[Double] = { val m = obj.asInstanceOf[MapWritable] - seqAsJavaList(m.keySet.map(w => w.asInstanceOf[DoubleWritable].get()).toSeq) + m.keySet.asScala.map(_.asInstanceOf[DoubleWritable].get()).toSeq.asJava } } @@ -76,9 +77,8 @@ private[python] class TestOutputKeyConverter extends Converter[Any, Any] { } private[python] class TestOutputValueConverter extends Converter[Any, Any] { - import collection.JavaConversions._ override def convert(obj: Any): DoubleWritable = { - new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().head) + new DoubleWritable(obj.asInstanceOf[java.util.Map[Double, _]].keySet().iterator().next()) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/RRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala index 1cf2824f..9d5bbb5 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RRDD.scala @@ -19,9 +19,10 @@ package org.apache.spark.api.r import java.io._ import java.net.{InetAddress, ServerSocket} +import java.util.Arrays import java.util.{Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.io.Source import scala.reflect.ClassTag import scala.util.Try @@ -365,11 +366,11 @@ private[r] object RRDD { sparkConf.setIfMissing("spark.master", "local") } - for ((name, value) <- sparkEnvirMap) { - sparkConf.set(name.asInstanceOf[String], value.asInstanceOf[String]) + for ((name, value) <- sparkEnvirMap.asScala) { + sparkConf.set(name.toString, value.toString) } - for ((name, value) <- sparkExecutorEnvMap) { - sparkConf.setExecutorEnv(name.asInstanceOf[String], value.asInstanceOf[String]) + for ((name, value) <- sparkExecutorEnvMap.asScala) { + sparkConf.setExecutorEnv(name.toString, value.toString) } val jsc = new JavaSparkContext(sparkConf) @@ -395,7 +396,7 @@ private[r] object RRDD { val rOptions = "--vanilla" val rLibDir = RUtils.sparkRPackagePath(isDriver = false) val rExecScript = rLibDir + "/SparkR/worker/" + script - val pb = new ProcessBuilder(List(rCommand, rOptions, rExecScript)) + val pb = new ProcessBuilder(Arrays.asList(rCommand, rOptions, rExecScript)) // Unset the R_TESTS environment variable for workers. // This is set by R CMD check as startup.Rs // (http://svn.r-project.org/R/trunk/src/library/tools/R/testing.R) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/RUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala index 427b2bc..9e807cc 100644 --- a/core/src/main/scala/org/apache/spark/api/r/RUtils.scala +++ b/core/src/main/scala/org/apache/spark/api/r/RUtils.scala @@ -18,8 +18,7 @@ package org.apache.spark.api.r import java.io.File - -import scala.collection.JavaConversions._ +import java.util.Arrays import org.apache.spark.{SparkEnv, SparkException} @@ -68,7 +67,7 @@ private[spark] object RUtils { /** Check if R is installed before running tests that use R commands. */ def isRInstalled: Boolean = { try { - val builder = new ProcessBuilder(Seq("R", "--version")) + val builder = new ProcessBuilder(Arrays.asList("R", "--version")) builder.start().waitFor() == 0 } catch { case e: Exception => false http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/api/r/SerDe.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala index 3c89f24..dbbbcf4 100644 --- a/core/src/main/scala/org/apache/spark/api/r/SerDe.scala +++ b/core/src/main/scala/org/apache/spark/api/r/SerDe.scala @@ -20,7 +20,7 @@ package org.apache.spark.api.r import java.io.{DataInputStream, DataOutputStream} import java.sql.{Timestamp, Date, Time} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ /** * Utility functions to serialize, deserialize objects to / from R @@ -165,7 +165,7 @@ private[spark] object SerDe { val valueType = readObjectType(in) readTypedObject(in, valueType) }) - mapAsJavaMap(keys.zip(values).toMap) + keys.zip(values).toMap.asJava } else { new java.util.HashMap[Object, Object]() } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala index a0c9b5e..7e3764d 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala @@ -20,7 +20,7 @@ package org.apache.spark.broadcast import java.io._ import java.nio.ByteBuffer -import scala.collection.JavaConversions.asJavaEnumeration +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.Random @@ -210,7 +210,7 @@ private object TorrentBroadcast extends Logging { compressionCodec: Option[CompressionCodec]): T = { require(blocks.nonEmpty, "Cannot unblockify an empty array of blocks") val is = new SequenceInputStream( - asJavaEnumeration(blocks.iterator.map(block => new ByteBufferInputStream(block)))) + blocks.iterator.map(new ByteBufferInputStream(_)).asJavaEnumeration) val in: InputStream = compressionCodec.map(c => c.compressedInputStream(is)).getOrElse(is) val ser = serializer.newInstance() val serIn = ser.deserializeStream(in) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala index 22ef701..6840a3a 100644 --- a/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala +++ b/core/src/main/scala/org/apache/spark/deploy/ExternalShuffleService.scala @@ -19,13 +19,13 @@ package org.apache.spark.deploy import java.util.concurrent.CountDownLatch -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.{Logging, SparkConf, SecurityManager} import org.apache.spark.network.TransportContext import org.apache.spark.network.netty.SparkTransportConf import org.apache.spark.network.sasl.SaslServerBootstrap -import org.apache.spark.network.server.TransportServer +import org.apache.spark.network.server.{TransportServerBootstrap, TransportServer} import org.apache.spark.network.shuffle.ExternalShuffleBlockHandler import org.apache.spark.network.util.TransportConf import org.apache.spark.util.Utils @@ -67,13 +67,13 @@ class ExternalShuffleService(sparkConf: SparkConf, securityManager: SecurityMana def start() { require(server == null, "Shuffle server already started") logInfo(s"Starting shuffle service on port $port with useSasl = $useSasl") - val bootstraps = + val bootstraps: Seq[TransportServerBootstrap] = if (useSasl) { Seq(new SaslServerBootstrap(transportConf, securityManager)) } else { Nil } - server = transportContext.createServer(port, bootstraps) + server = transportContext.createServer(port, bootstraps.asJava) } /** Clean up all shuffle files associated with an application that has exited. */ http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala index 23d01e9..d853276 100644 --- a/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/PythonRunner.scala @@ -21,7 +21,7 @@ import java.net.URI import java.io.File import scala.collection.mutable.ArrayBuffer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.Try import org.apache.spark.SparkUserAppException @@ -71,7 +71,7 @@ object PythonRunner { val pythonPath = PythonUtils.mergePythonPaths(pathElements: _*) // Launch Python process - val builder = new ProcessBuilder(Seq(pythonExec, formattedPythonFile) ++ otherArgs) + val builder = new ProcessBuilder((Seq(pythonExec, formattedPythonFile) ++ otherArgs).asJava) val env = builder.environment() env.put("PYTHONPATH", pythonPath) // This is equivalent to setting the -u flag; we use it because ipython doesn't support -u: http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala index ed1e972..4b28866 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RPackageUtils.scala @@ -22,7 +22,7 @@ import java.util.jar.JarFile import java.util.logging.Level import java.util.zip.{ZipEntry, ZipOutputStream} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.io.{ByteStreams, Files} @@ -110,7 +110,7 @@ private[deploy] object RPackageUtils extends Logging { print(s"Building R package with the command: $installCmd", printStream) } try { - val builder = new ProcessBuilder(installCmd) + val builder = new ProcessBuilder(installCmd.asJava) builder.redirectErrorStream(true) val env = builder.environment() env.clear() http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/RRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala index c0cab22..05b954c 100644 --- a/core/src/main/scala/org/apache/spark/deploy/RRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/RRunner.scala @@ -20,7 +20,7 @@ package org.apache.spark.deploy import java.io._ import java.util.concurrent.{Semaphore, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path @@ -68,7 +68,7 @@ object RRunner { if (initialized.tryAcquire(backendTimeout, TimeUnit.SECONDS)) { // Launch R val returnCode = try { - val builder = new ProcessBuilder(Seq(rCommand, rFileNormalized) ++ otherArgs) + val builder = new ProcessBuilder((Seq(rCommand, rFileNormalized) ++ otherArgs).asJava) val env = builder.environment() env.put("EXISTING_SPARKR_BACKEND_PORT", sparkRBackendPort.toString) val rPackageDir = RUtils.sparkRPackagePath(isDriver = true) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala index b8d3993..8d5e716 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkCuratorUtil.scala @@ -17,7 +17,7 @@ package org.apache.spark.deploy -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.curator.framework.{CuratorFramework, CuratorFrameworkFactory} import org.apache.curator.retry.ExponentialBackoffRetry @@ -57,7 +57,7 @@ private[spark] object SparkCuratorUtil extends Logging { def deleteRecursive(zk: CuratorFramework, path: String) { if (zk.checkExists().forPath(path) != null) { - for (child <- zk.getChildren.forPath(path)) { + for (child <- zk.getChildren.forPath(path).asScala) { zk.delete().forPath(path + "/" + child) } zk.delete().forPath(path) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala index dda4216..f7723ef 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala @@ -22,7 +22,7 @@ import java.lang.reflect.Method import java.security.PrivilegedExceptionAction import java.util.{Arrays, Comparator} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.control.NonFatal @@ -71,7 +71,7 @@ class SparkHadoopUtil extends Logging { } def transferCredentials(source: UserGroupInformation, dest: UserGroupInformation) { - for (token <- source.getTokens()) { + for (token <- source.getTokens.asScala) { dest.addToken(token) } } @@ -175,8 +175,8 @@ class SparkHadoopUtil extends Logging { } private def getFileSystemThreadStatistics(): Seq[AnyRef] = { - val stats = FileSystem.getAllStatistics() - stats.map(Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) + FileSystem.getAllStatistics.asScala.map( + Utils.invoke(classOf[Statistics], _, "getThreadStatistics")) } private def getFileSystemThreadStatisticsMethod(methodName: String): Method = { @@ -306,12 +306,13 @@ class SparkHadoopUtil extends Logging { val renewalInterval = sparkConf.getLong("spark.yarn.token.renewal.interval", (24 hours).toMillis) - credentials.getAllTokens.filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) + credentials.getAllTokens.asScala + .filter(_.getKind == DelegationTokenIdentifier.HDFS_DELEGATION_KIND) .map { t => - val identifier = new DelegationTokenIdentifier() - identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) - (identifier.getIssueDate + fraction * renewalInterval).toLong - now - }.foldLeft(0L)(math.max) + val identifier = new DelegationTokenIdentifier() + identifier.readFields(new DataInputStream(new ByteArrayInputStream(t.getIdentifier))) + (identifier.getIssueDate + fraction * renewalInterval).toLong - now + }.foldLeft(0L)(math.max) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 3f3c662..18a1c52 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -23,7 +23,7 @@ import java.net.URI import java.util.{List => JList} import java.util.jar.JarFile -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.io.Source @@ -94,7 +94,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S // Set parameters from command line arguments try { - parse(args.toList) + parse(args.asJava) } catch { case e: IllegalArgumentException => SparkSubmit.printErrorAndExit(e.getMessage()) @@ -458,7 +458,7 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S } override protected def handleExtraArgs(extra: JList[String]): Unit = { - childArgs ++= extra + childArgs ++= extra.asScala } private def printUsageAndExit(exitCode: Int, unknownParam: Any = null): Unit = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala index 563831c..540e802 100644 --- a/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/deploy/master/ZooKeeperPersistenceEngine.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.master import java.nio.ByteBuffer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.curator.framework.CuratorFramework @@ -49,8 +49,8 @@ private[master] class ZooKeeperPersistenceEngine(conf: SparkConf, val serializer } override def read[T: ClassTag](prefix: String): Seq[T] = { - val file = zk.getChildren.forPath(WORKING_DIR).filter(_.startsWith(prefix)) - file.map(deserializeFromFile[T]).flatten + zk.getChildren.forPath(WORKING_DIR).asScala + .filter(_.startsWith(prefix)).map(deserializeFromFile[T]).flatten } override def close() { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala index 45a3f43..ce02ee2 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/CommandUtils.scala @@ -18,9 +18,8 @@ package org.apache.spark.deploy.worker import java.io.{File, FileOutputStream, InputStream, IOException} -import java.lang.System._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map import org.apache.spark.Logging @@ -62,7 +61,7 @@ object CommandUtils extends Logging { // SPARK-698: do not call the run.cmd script, as process.destroy() // fails to kill a process tree on Windows val cmd = new WorkerCommandBuilder(sparkHome, memory, command).buildCommand() - cmd.toSeq ++ Seq(command.mainClass) ++ command.arguments + cmd.asScala ++ Seq(command.mainClass) ++ command.arguments } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala index ec51c3d..89159ff 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverRunner.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker import java.io._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files @@ -172,8 +172,8 @@ private[deploy] class DriverRunner( CommandUtils.redirectStream(process.getInputStream, stdout) val stderr = new File(baseDir, "stderr") - val header = "Launch Command: %s\n%s\n\n".format( - builder.command.mkString("\"", "\" \"", "\""), "=" * 40) + val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"") + val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40) Files.append(header, stderr, UTF_8) CommandUtils.redirectStream(process.getErrorStream, stderr) } @@ -229,6 +229,6 @@ private[deploy] trait ProcessBuilderLike { private[deploy] object ProcessBuilderLike { def apply(processBuilder: ProcessBuilder): ProcessBuilderLike = new ProcessBuilderLike { override def start(): Process = processBuilder.start() - override def command: Seq[String] = processBuilder.command() + override def command: Seq[String] = processBuilder.command().asScala } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala index ab3fea4..3aef051 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/ExecutorRunner.scala @@ -19,7 +19,7 @@ package org.apache.spark.deploy.worker import java.io._ -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import com.google.common.io.Files @@ -129,7 +129,8 @@ private[deploy] class ExecutorRunner( val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf), memory, sparkHome.getAbsolutePath, substituteVariables) val command = builder.command() - logInfo("Launch command: " + command.mkString("\"", "\" \"", "\"")) + val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"") + logInfo(s"Launch command: $formattedCommand") builder.directory(executorDir) builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator)) @@ -145,7 +146,7 @@ private[deploy] class ExecutorRunner( process = builder.start() val header = "Spark Executor Command: %s\n%s\n\n".format( - command.mkString("\"", "\" \"", "\""), "=" * 40) + formattedCommand, "=" * 40) // Redirect its stdout and stderr to files val stdout = new File(executorDir, "stdout") http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala index 79b1536..770927c 100755 --- a/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala @@ -24,7 +24,6 @@ import java.util.{UUID, Date} import java.util.concurrent._ import java.util.concurrent.{Future => JFuture, ScheduledFuture => JScheduledFuture} -import scala.collection.JavaConversions._ import scala.collection.mutable.{HashMap, HashSet, LinkedHashMap} import scala.concurrent.ExecutionContext import scala.util.Random http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/Executor.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/Executor.scala b/core/src/main/scala/org/apache/spark/executor/Executor.scala index 42a85e4..c3491bb 100644 --- a/core/src/main/scala/org/apache/spark/executor/Executor.scala +++ b/core/src/main/scala/org/apache/spark/executor/Executor.scala @@ -23,7 +23,7 @@ import java.net.URL import java.nio.ByteBuffer import java.util.concurrent.{ConcurrentHashMap, TimeUnit} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal @@ -147,7 +147,7 @@ private[spark] class Executor( /** Returns the total amount of time this JVM process has spent in garbage collection. */ private def computeTotalGcTime(): Long = { - ManagementFactory.getGarbageCollectorMXBeans.map(_.getCollectionTime).sum + ManagementFactory.getGarbageCollectorMXBeans.asScala.map(_.getCollectionTime).sum } class TaskRunner( @@ -425,7 +425,7 @@ private[spark] class Executor( val tasksMetrics = new ArrayBuffer[(Long, TaskMetrics)]() val curGCTime = computeTotalGcTime() - for (taskRunner <- runningTasks.values()) { + for (taskRunner <- runningTasks.values().asScala) { if (taskRunner.task != null) { taskRunner.task.metrics.foreach { metrics => metrics.updateShuffleReadMetrics() http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala index 293c512..d16f4a1 100644 --- a/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala +++ b/core/src/main/scala/org/apache/spark/executor/ExecutorSource.scala @@ -19,7 +19,7 @@ package org.apache.spark.executor import java.util.concurrent.ThreadPoolExecutor -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.codahale.metrics.{Gauge, MetricRegistry} import org.apache.hadoop.fs.FileSystem @@ -30,7 +30,7 @@ private[spark] class ExecutorSource(threadPool: ThreadPoolExecutor, executorId: String) extends Source { private def fileStats(scheme: String) : Option[FileSystem.Statistics] = - FileSystem.getAllStatistics().find(s => s.getScheme.equals(scheme)) + FileSystem.getAllStatistics.asScala.find(s => s.getScheme.equals(scheme)) private def registerFileSystemStat[T]( scheme: String, name: String, f: FileSystem.Statistics => T, defaultValue: T) = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala index cfd672e..0474fd2 100644 --- a/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala +++ b/core/src/main/scala/org/apache/spark/executor/MesosExecutorBackend.scala @@ -19,7 +19,7 @@ package org.apache.spark.executor import java.nio.ByteBuffer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.mesos.protobuf.ByteString import org.apache.mesos.{Executor => MesosExecutor, ExecutorDriver, MesosExecutorDriver} @@ -28,7 +28,7 @@ import org.apache.mesos.Protos.{TaskStatus => MesosTaskStatus, _} import org.apache.spark.{Logging, TaskState, SparkConf, SparkEnv} import org.apache.spark.TaskState.TaskState import org.apache.spark.deploy.SparkHadoopUtil -import org.apache.spark.scheduler.cluster.mesos.{MesosTaskLaunchData} +import org.apache.spark.scheduler.cluster.mesos.MesosTaskLaunchData import org.apache.spark.util.{SignalLogger, Utils} private[spark] class MesosExecutorBackend @@ -55,7 +55,7 @@ private[spark] class MesosExecutorBackend slaveInfo: SlaveInfo) { // Get num cores for this task from ExecutorInfo, created in MesosSchedulerBackend. - val cpusPerTask = executorInfo.getResourcesList + val cpusPerTask = executorInfo.getResourcesList.asScala .find(_.getName == "cpus") .map(_.getScalar.getValue.toInt) .getOrElse(0) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala index 6cda777..a5ad472 100644 --- a/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala +++ b/core/src/main/scala/org/apache/spark/input/PortableDataStream.scala @@ -19,7 +19,7 @@ package org.apache.spark.input import java.io.{ByteArrayInputStream, ByteArrayOutputStream, DataInputStream, DataOutputStream} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.io.ByteStreams import org.apache.hadoop.conf.Configuration @@ -44,12 +44,9 @@ private[spark] abstract class StreamFileInputFormat[T] * which is set through setMaxSplitSize */ def setMinPartitions(context: JobContext, minPartitions: Int) { - val files = listStatus(context) - val totalLen = files.map { file => - if (file.isDir) 0L else file.getLen - }.sum - - val maxSplitSize = Math.ceil(totalLen * 1.0 / files.length).toLong + val files = listStatus(context).asScala + val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum + val maxSplitSize = Math.ceil(totalLen * 1.0 / files.size).toLong super.setMaxSplitSize(maxSplitSize) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala index aaef7c7..1ba34a1 100644 --- a/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala +++ b/core/src/main/scala/org/apache/spark/input/WholeTextFileInputFormat.scala @@ -17,7 +17,7 @@ package org.apache.spark.input -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.hadoop.fs.Path import org.apache.hadoop.mapreduce.InputSplit @@ -52,10 +52,8 @@ private[spark] class WholeTextFileInputFormat * which is set through setMaxSplitSize */ def setMinPartitions(context: JobContext, minPartitions: Int) { - val files = listStatus(context) - val totalLen = files.map { file => - if (file.isDir) 0L else file.getLen - }.sum + val files = listStatus(context).asScala + val totalLen = files.map(file => if (file.isDir) 0L else file.getLen).sum val maxSplitSize = Math.ceil(totalLen * 1.0 / (if (minPartitions == 0) 1 else minPartitions)).toLong super.setMaxSplitSize(maxSplitSize) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala index 9be9872..0c09665 100644 --- a/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala +++ b/core/src/main/scala/org/apache/spark/launcher/WorkerCommandBuilder.scala @@ -20,7 +20,7 @@ package org.apache.spark.launcher import java.io.File import java.util.{HashMap => JHashMap, List => JList, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.deploy.Command @@ -32,7 +32,7 @@ import org.apache.spark.deploy.Command private[spark] class WorkerCommandBuilder(sparkHome: String, memoryMb: Int, command: Command) extends AbstractCommandBuilder { - childEnv.putAll(command.environment) + childEnv.putAll(command.environment.asJava) childEnv.put(CommandBuilderUtils.ENV_SPARK_HOME, sparkHome) override def buildCommand(env: JMap[String, String]): JList[String] = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala index d749555..dd2d325 100644 --- a/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala +++ b/core/src/main/scala/org/apache/spark/metrics/MetricsConfig.scala @@ -20,6 +20,7 @@ package org.apache.spark.metrics import java.io.{FileInputStream, InputStream} import java.util.Properties +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.util.matching.Regex @@ -58,25 +59,20 @@ private[spark] class MetricsConfig(conf: SparkConf) extends Logging { propertyCategories = subProperties(properties, INSTANCE_REGEX) if (propertyCategories.contains(DEFAULT_PREFIX)) { - import scala.collection.JavaConversions._ - - val defaultProperty = propertyCategories(DEFAULT_PREFIX) - for { (inst, prop) <- propertyCategories - if (inst != DEFAULT_PREFIX) - (k, v) <- defaultProperty - if (prop.getProperty(k) == null) } { - prop.setProperty(k, v) + val defaultProperty = propertyCategories(DEFAULT_PREFIX).asScala + for((inst, prop) <- propertyCategories if (inst != DEFAULT_PREFIX); + (k, v) <- defaultProperty if (prop.get(k) == null)) { + prop.put(k, v) } } } def subProperties(prop: Properties, regex: Regex): mutable.HashMap[String, Properties] = { val subProperties = new mutable.HashMap[String, Properties] - import scala.collection.JavaConversions._ - prop.foreach { kv => - if (regex.findPrefixOf(kv._1).isDefined) { - val regex(prefix, suffix) = kv._1 - subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2) + prop.asScala.foreach { kv => + if (regex.findPrefixOf(kv._1.toString).isDefined) { + val regex(prefix, suffix) = kv._1.toString + subProperties.getOrElseUpdate(prefix, new Properties).setProperty(suffix, kv._2.toString) } } subProperties --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org