http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala index b089da8..7c170a7 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockRpcServer.scala @@ -19,7 +19,7 @@ package org.apache.spark.network.netty import java.nio.ByteBuffer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.Logging import org.apache.spark.network.BlockDataManager @@ -55,7 +55,7 @@ class NettyBlockRpcServer( case openBlocks: OpenBlocks => val blocks: Seq[ManagedBuffer] = openBlocks.blockIds.map(BlockId.apply).map(blockManager.getBlockData) - val streamId = streamManager.registerStream(blocks.iterator) + val streamId = streamManager.registerStream(blocks.iterator.asJava) logTrace(s"Registered streamId $streamId with ${blocks.size} buffers") responseContext.onSuccess(new StreamHandle(streamId, blocks.size).toByteArray)
http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala index d650d5f..ff8aae9 100644 --- a/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala +++ b/core/src/main/scala/org/apache/spark/network/netty/NettyBlockTransferService.scala @@ -17,7 +17,7 @@ package org.apache.spark.network.netty -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.{Future, Promise} import org.apache.spark.{SecurityManager, SparkConf} @@ -58,7 +58,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage securityManager.isSaslEncryptionEnabled())) } transportContext = new TransportContext(transportConf, rpcHandler) - clientFactory = transportContext.createClientFactory(clientBootstrap.toList) + clientFactory = transportContext.createClientFactory(clientBootstrap.toSeq.asJava) server = createServer(serverBootstrap.toList) appId = conf.getAppId logInfo("Server created on " + server.getPort) @@ -67,7 +67,7 @@ class NettyBlockTransferService(conf: SparkConf, securityManager: SecurityManage /** Creates and binds the TransportServer, possibly trying multiple ports. */ private def createServer(bootstraps: List[TransportServerBootstrap]): TransportServer = { def startService(port: Int): (TransportServer, Int) = { - val server = transportContext.createServer(port, bootstraps) + val server = transportContext.createServer(port, bootstraps.asJava) (server, server.getPort) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/network/nio/Connection.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala index 1499da0..8d9ebad 100644 --- a/core/src/main/scala/org/apache/spark/network/nio/Connection.scala +++ b/core/src/main/scala/org/apache/spark/network/nio/Connection.scala @@ -23,7 +23,7 @@ import java.nio.channels._ import java.util.concurrent.ConcurrentLinkedQueue import java.util.LinkedList -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{ArrayBuffer, HashMap} import scala.util.control.NonFatal @@ -145,7 +145,7 @@ abstract class Connection(val channel: SocketChannel, val selector: Selector, } def callOnExceptionCallbacks(e: Throwable) { - onExceptionCallbacks foreach { + onExceptionCallbacks.asScala.foreach { callback => try { callback(this, e) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 91b07ce..5afce75 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -19,7 +19,7 @@ package org.apache.spark.partial import java.util.{HashMap => JHashMap} -import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap import scala.reflect.ClassTag @@ -48,9 +48,9 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf if (outputsMerged == totalOutputs) { val result = new JHashMap[T, BoundedDouble](sums.size) sums.foreach { case (key, sum) => - result(key) = new BoundedDouble(sum, 1.0, sum, sum) + result.put(key, new BoundedDouble(sum, 1.0, sum, sum)) } - result + result.asScala } else if (outputsMerged == 0) { new HashMap[T, BoundedDouble] } else { @@ -64,9 +64,9 @@ private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, conf val stdev = math.sqrt(variance) val low = mean - confFactor * stdev val high = mean + confFactor * stdev - result(key) = new BoundedDouble(mean, confidence, low, high) + result.put(key, new BoundedDouble(mean, confidence, low, high)) } - result + result.asScala } } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala index af26c3d..a164040 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedMeanEvaluator.scala @@ -19,7 +19,7 @@ package org.apache.spark.partial import java.util.{HashMap => JHashMap} -import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap @@ -55,9 +55,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub while (iter.hasNext) { val entry = iter.next() val mean = entry.getValue.mean - result(entry.getKey) = new BoundedDouble(mean, 1.0, mean, mean) + result.put(entry.getKey, new BoundedDouble(mean, 1.0, mean, mean)) } - result + result.asScala } else if (outputsMerged == 0) { new HashMap[T, BoundedDouble] } else { @@ -72,9 +72,9 @@ private[spark] class GroupedMeanEvaluator[T](totalOutputs: Int, confidence: Doub val confFactor = studentTCacher.get(counter.count) val low = mean - confFactor * stdev val high = mean + confFactor * stdev - result(entry.getKey) = new BoundedDouble(mean, confidence, low, high) + result.put(entry.getKey, new BoundedDouble(mean, confidence, low, high)) } - result + result.asScala } } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala index 442fb86..54a1bea 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedSumEvaluator.scala @@ -19,7 +19,7 @@ package org.apache.spark.partial import java.util.{HashMap => JHashMap} -import scala.collection.JavaConversions.mapAsScalaMap +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.HashMap @@ -55,9 +55,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Doubl while (iter.hasNext) { val entry = iter.next() val sum = entry.getValue.sum - result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum) + result.put(entry.getKey, new BoundedDouble(sum, 1.0, sum, sum)) } - result + result.asScala } else if (outputsMerged == 0) { new HashMap[T, BoundedDouble] } else { @@ -80,9 +80,9 @@ private[spark] class GroupedSumEvaluator[T](totalOutputs: Int, confidence: Doubl val confFactor = studentTCacher.get(counter.count) val low = sumEstimate - confFactor * sumStdev val high = sumEstimate + confFactor * sumStdev - result(entry.getKey) = new BoundedDouble(sumEstimate, confidence, low, high) + result.put(entry.getKey, new BoundedDouble(sumEstimate, confidence, low, high)) } - result + result.asScala } } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala index 326fafb..4e5f2e8 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala @@ -22,7 +22,7 @@ import java.text.SimpleDateFormat import java.util.{Date, HashMap => JHashMap} import scala.collection.{Map, mutable} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag import scala.util.DynamicVariable @@ -312,14 +312,14 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)]) } : Iterator[JHashMap[K, V]] val mergeMaps = (m1: JHashMap[K, V], m2: JHashMap[K, V]) => { - m2.foreach { pair => + m2.asScala.foreach { pair => val old = m1.get(pair._1) m1.put(pair._1, if (old == null) pair._2 else cleanedF(old, pair._2)) } m1 } : JHashMap[K, V] - self.mapPartitions(reducePartition).reduce(mergeMaps) + self.mapPartitions(reducePartition).reduce(mergeMaps).asScala } /** Alias for reduceByKeyLocally */ http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala index 3bb9998..afbe566 100644 --- a/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/PipedRDD.scala @@ -23,7 +23,7 @@ import java.io.IOException import java.io.PrintWriter import java.util.StringTokenizer -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source @@ -72,7 +72,7 @@ private[spark] class PipedRDD[T: ClassTag]( } override def compute(split: Partition, context: TaskContext): Iterator[String] = { - val pb = new ProcessBuilder(command) + val pb = new ProcessBuilder(command.asJava) // Add the environmental variables to the process. val currentEnvVars = pb.environment() envVars.foreach { case (variable, value) => currentEnvVars.put(variable, value) } @@ -81,7 +81,7 @@ private[spark] class PipedRDD[T: ClassTag]( // so the user code can access the input filename if (split.isInstanceOf[HadoopPartition]) { val hadoopSplit = split.asInstanceOf[HadoopPartition] - currentEnvVars.putAll(hadoopSplit.getPipeEnvVars()) + currentEnvVars.putAll(hadoopSplit.getPipeEnvVars().asJava) } // When spark.worker.separated.working.directory option is turned on, each http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala index f7cb179..9a4fa30 100644 --- a/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/SubtractedRDD.scala @@ -19,7 +19,7 @@ package org.apache.spark.rdd import java.util.{HashMap => JHashMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.reflect.ClassTag @@ -125,7 +125,7 @@ private[spark] class SubtractedRDD[K: ClassTag, V: ClassTag, W: ClassTag]( integrate(0, t => getSeq(t._1) += t._2) // the second dep is rdd2; remove all of its keys integrate(1, t => map.remove(t._1)) - map.iterator.map { t => t._2.iterator.map { (t._1, _) } }.flatten + map.asScala.iterator.map(t => t._2.iterator.map((t._1, _))).flatten } override def clearDependencies() { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala index bac37bf..0e438ab 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/InputFormatInfo.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.immutable.Set import scala.collection.mutable.{ArrayBuffer, HashMap, HashSet} @@ -107,7 +107,7 @@ class InputFormatInfo(val configuration: Configuration, val inputFormatClazz: Cl val retval = new ArrayBuffer[SplitInfo]() val list = instance.getSplits(job) - for (split <- list) { + for (split <- list.asScala) { retval ++= SplitInfo.toSplitInfo(inputFormatClazz, path, split) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/Pool.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala index 174b732..5821afe 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Pool.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Pool.scala @@ -19,7 +19,7 @@ package org.apache.spark.scheduler import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import org.apache.spark.Logging @@ -74,7 +74,7 @@ private[spark] class Pool( if (schedulableNameToSchedulable.containsKey(schedulableName)) { return schedulableNameToSchedulable.get(schedulableName) } - for (schedulable <- schedulableQueue) { + for (schedulable <- schedulableQueue.asScala) { val sched = schedulable.getSchedulableByName(schedulableName) if (sched != null) { return sched @@ -84,12 +84,12 @@ private[spark] class Pool( } override def executorLost(executorId: String, host: String) { - schedulableQueue.foreach(_.executorLost(executorId, host)) + schedulableQueue.asScala.foreach(_.executorLost(executorId, host)) } override def checkSpeculatableTasks(): Boolean = { var shouldRevive = false - for (schedulable <- schedulableQueue) { + for (schedulable <- schedulableQueue.asScala) { shouldRevive |= schedulable.checkSpeculatableTasks() } shouldRevive @@ -98,7 +98,7 @@ private[spark] class Pool( override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = - schedulableQueue.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) + schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala index d6e1e9e..452c32d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/CoarseMesosSchedulerBackend.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} import com.google.common.collect.HashBiMap @@ -233,7 +233,7 @@ private[spark] class CoarseMesosSchedulerBackend( override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { stateLock.synchronized { val filters = Filters.newBuilder().setRefuseSeconds(5).build() - for (offer <- offers) { + for (offer <- offers.asScala) { val offerAttributes = toAttributeMap(offer.getAttributesList) val meetsConstraints = matchesAttributeRequirements(slaveOfferConstraints, offerAttributes) val slaveId = offer.getSlaveId.getValue @@ -251,21 +251,21 @@ private[spark] class CoarseMesosSchedulerBackend( val cpusToUse = math.min(cpus, maxCores - totalCoresAcquired) totalCoresAcquired += cpusToUse val taskId = newMesosTaskId() - taskIdToSlaveId(taskId) = slaveId + taskIdToSlaveId.put(taskId, slaveId) slaveIdsWithExecutors += slaveId coresByTaskId(taskId) = cpusToUse // Gather cpu resources from the available resources and use them in the task. val (remainingResources, cpuResourcesToUse) = partitionResources(offer.getResourcesList, "cpus", cpusToUse) val (_, memResourcesToUse) = - partitionResources(remainingResources, "mem", calculateTotalMemory(sc)) + partitionResources(remainingResources.asJava, "mem", calculateTotalMemory(sc)) val taskBuilder = MesosTaskInfo.newBuilder() .setTaskId(TaskID.newBuilder().setValue(taskId.toString).build()) .setSlaveId(offer.getSlaveId) .setCommand(createCommand(offer, cpusToUse + extraCoresPerSlave, taskId)) .setName("Task " + taskId) - .addAllResources(cpuResourcesToUse) - .addAllResources(memResourcesToUse) + .addAllResources(cpuResourcesToUse.asJava) + .addAllResources(memResourcesToUse.asJava) sc.conf.getOption("spark.mesos.executor.docker.image").foreach { image => MesosSchedulerBackendUtil @@ -314,9 +314,9 @@ private[spark] class CoarseMesosSchedulerBackend( } if (TaskState.isFinished(TaskState.fromMesos(state))) { - val slaveId = taskIdToSlaveId(taskId) + val slaveId = taskIdToSlaveId.get(taskId) slaveIdsWithExecutors -= slaveId - taskIdToSlaveId -= taskId + taskIdToSlaveId.remove(taskId) // Remove the cores we have remembered for this task, if it's in the hashmap for (cores <- coresByTaskId.get(taskId)) { totalCoresAcquired -= cores @@ -361,7 +361,7 @@ private[spark] class CoarseMesosSchedulerBackend( stateLock.synchronized { if (slaveIdsWithExecutors.contains(slaveId)) { val slaveIdToTaskId = taskIdToSlaveId.inverse() - if (slaveIdToTaskId.contains(slaveId)) { + if (slaveIdToTaskId.containsKey(slaveId)) { val taskId: Int = slaveIdToTaskId.get(slaveId) taskIdToSlaveId.remove(taskId) removeExecutor(sparkExecutorId(slaveId, taskId.toString), reason) @@ -411,7 +411,7 @@ private[spark] class CoarseMesosSchedulerBackend( val slaveIdToTaskId = taskIdToSlaveId.inverse() for (executorId <- executorIds) { val slaveId = executorId.split("/")(0) - if (slaveIdToTaskId.contains(slaveId)) { + if (slaveIdToTaskId.containsKey(slaveId)) { mesosDriver.killTask( TaskID.newBuilder().setValue(slaveIdToTaskId.get(slaveId).toString).build()) pendingRemovedSlaveIds += slaveId http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala index 3efc536..e0c547d 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterPersistenceEngine.scala @@ -17,7 +17,7 @@ package org.apache.spark.scheduler.cluster.mesos -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.curator.framework.CuratorFramework import org.apache.zookeeper.CreateMode @@ -129,6 +129,6 @@ private[spark] class ZookeeperMesosClusterPersistenceEngine( } override def fetchAll[T](): Iterable[T] = { - zk.getChildren.forPath(WORKING_DIR).map(fetch[T]).flatten + zk.getChildren.forPath(WORKING_DIR).asScala.flatMap(fetch[T]) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala index 1206f18..07da924 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosClusterScheduler.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.concurrent.locks.ReentrantLock import java.util.{Collections, Date, List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -350,7 +350,7 @@ private[spark] class MesosClusterScheduler( } // TODO: Page the status updates to avoid trying to reconcile // a large amount of tasks at once. - driver.reconcileTasks(statuses) + driver.reconcileTasks(statuses.toSeq.asJava) } } } @@ -493,10 +493,10 @@ private[spark] class MesosClusterScheduler( } override def resourceOffers(driver: SchedulerDriver, offers: JList[Offer]): Unit = { - val currentOffers = offers.map { o => + val currentOffers = offers.asScala.map(o => new ResourceOffer( o, getResource(o.getResourcesList, "cpus"), getResource(o.getResourcesList, "mem")) - }.toList + ).toList logTrace(s"Received offers from Mesos: \n${currentOffers.mkString("\n")}") val tasks = new mutable.HashMap[OfferID, ArrayBuffer[TaskInfo]]() val currentTime = new Date() @@ -521,10 +521,10 @@ private[spark] class MesosClusterScheduler( currentOffers, tasks) } - tasks.foreach { case (offerId, tasks) => - driver.launchTasks(Collections.singleton(offerId), tasks) + tasks.foreach { case (offerId, taskInfos) => + driver.launchTasks(Collections.singleton(offerId), taskInfos.asJava) } - offers + offers.asScala .filter(o => !tasks.keySet.contains(o.getId)) .foreach(o => driver.declineOffer(o.getId)) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala index 5c20606..2e42405 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackend.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.io.File import java.util.{ArrayList => JArrayList, Collections, List => JList} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.{HashMap, HashSet} import org.apache.mesos.{Scheduler => MScheduler, _} @@ -129,14 +129,12 @@ private[spark] class MesosSchedulerBackend( val (resourcesAfterCpu, usedCpuResources) = partitionResources(availableResources, "cpus", scheduler.CPUS_PER_TASK) val (resourcesAfterMem, usedMemResources) = - partitionResources(resourcesAfterCpu, "mem", calculateTotalMemory(sc)) + partitionResources(resourcesAfterCpu.asJava, "mem", calculateTotalMemory(sc)) - builder.addAllResources(usedCpuResources) - builder.addAllResources(usedMemResources) + builder.addAllResources(usedCpuResources.asJava) + builder.addAllResources(usedMemResources.asJava) - sc.conf.getOption("spark.mesos.uris").map { uris => - setupUris(uris, command) - } + sc.conf.getOption("spark.mesos.uris").foreach(setupUris(_, command)) val executorInfo = builder .setExecutorId(ExecutorID.newBuilder().setValue(execId).build()) @@ -148,7 +146,7 @@ private[spark] class MesosSchedulerBackend( .setupContainerBuilderDockerInfo(image, sc.conf, executorInfo.getContainerBuilder()) } - (executorInfo.build(), resourcesAfterMem) + (executorInfo.build(), resourcesAfterMem.asJava) } /** @@ -193,7 +191,7 @@ private[spark] class MesosSchedulerBackend( private def getTasksSummary(tasks: JArrayList[MesosTaskInfo]): String = { val builder = new StringBuilder - tasks.foreach { t => + tasks.asScala.foreach { t => builder.append("Task id: ").append(t.getTaskId.getValue).append("\n") .append("Slave id: ").append(t.getSlaveId.getValue).append("\n") .append("Task resources: ").append(t.getResourcesList).append("\n") @@ -211,7 +209,7 @@ private[spark] class MesosSchedulerBackend( override def resourceOffers(d: SchedulerDriver, offers: JList[Offer]) { inClassLoader() { // Fail-fast on offers we know will be rejected - val (usableOffers, unUsableOffers) = offers.partition { o => + val (usableOffers, unUsableOffers) = offers.asScala.partition { o => val mem = getResource(o.getResourcesList, "mem") val cpus = getResource(o.getResourcesList, "cpus") val slaveId = o.getSlaveId.getValue @@ -323,10 +321,10 @@ private[spark] class MesosSchedulerBackend( .setSlaveId(SlaveID.newBuilder().setValue(slaveId).build()) .setExecutor(executorInfo) .setName(task.name) - .addAllResources(cpuResources) + .addAllResources(cpuResources.asJava) .setData(MesosTaskLaunchData(task.serializedTask, task.attemptNumber).toByteString) .build() - (taskInfo, finalResources) + (taskInfo, finalResources.asJava) } override def statusUpdate(d: SchedulerDriver, status: TaskStatus) { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala index 5b854aa..860c8e0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerUtils.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler.cluster.mesos import java.util.{List => JList} import java.util.concurrent.CountDownLatch -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import scala.util.control.NonFatal @@ -137,7 +137,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { protected def getResource(res: JList[Resource], name: String): Double = { // A resource can have multiple values in the offer since it can either be from // a specific role or wildcard. - res.filter(_.getName == name).map(_.getScalar.getValue).sum + res.asScala.filter(_.getName == name).map(_.getScalar.getValue).sum } protected def markRegistered(): Unit = { @@ -169,7 +169,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { amountToUse: Double): (List[Resource], List[Resource]) = { var remain = amountToUse var requestedResources = new ArrayBuffer[Resource] - val remainingResources = resources.map { + val remainingResources = resources.asScala.map { case r => { if (remain > 0 && r.getType == Value.Type.SCALAR && @@ -214,7 +214,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { * @return */ protected def toAttributeMap(offerAttributes: JList[Attribute]): Map[String, GeneratedMessage] = { - offerAttributes.map(attr => { + offerAttributes.asScala.map(attr => { val attrValue = attr.getType match { case Value.Type.SCALAR => attr.getScalar case Value.Type.RANGES => attr.getRanges @@ -253,7 +253,7 @@ private[mesos] trait MesosSchedulerUtils extends Logging { requiredValues.map(_.toLong).exists(offerRange.contains(_)) case Some(offeredValue: Value.Set) => // check if the specified required values is a subset of offered set - requiredValues.subsetOf(offeredValue.getItemList.toSet) + requiredValues.subsetOf(offeredValue.getItemList.asScala.toSet) case Some(textValue: Value.Text) => // check if the specified value is equal, if multiple values are specified // we succeed if any of them match. @@ -299,14 +299,13 @@ private[mesos] trait MesosSchedulerUtils extends Logging { Map() } else { try { - Map() ++ mapAsScalaMap(splitter.split(constraintsVal)).map { - case (k, v) => - if (v == null || v.isEmpty) { - (k, Set[String]()) - } else { - (k, v.split(',').toSet) - } - } + splitter.split(constraintsVal).asScala.toMap.mapValues(v => + if (v == null || v.isEmpty) { + Set[String]() + } else { + v.split(',').toSet + } + ) } catch { case NonFatal(e) => throw new IllegalArgumentException(s"Bad constraint string: $constraintsVal", e) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala index 0ff7562..048a938 100644 --- a/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/KryoSerializer.scala @@ -21,6 +21,7 @@ import java.io.{EOFException, IOException, InputStream, OutputStream} import java.nio.ByteBuffer import javax.annotation.Nullable +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.esotericsoftware.kryo.{Kryo, KryoException} @@ -373,16 +374,15 @@ private class JavaIterableWrapperSerializer override def read(kryo: Kryo, in: KryoInput, clz: Class[java.lang.Iterable[_]]) : java.lang.Iterable[_] = { kryo.readClassAndObject(in) match { - case scalaIterable: Iterable[_] => - scala.collection.JavaConversions.asJavaIterable(scalaIterable) - case javaIterable: java.lang.Iterable[_] => - javaIterable + case scalaIterable: Iterable[_] => scalaIterable.asJava + case javaIterable: java.lang.Iterable[_] => javaIterable } } } private object JavaIterableWrapperSerializer extends Logging { - // The class returned by asJavaIterable (scala.collection.convert.Wrappers$IterableWrapper). + // The class returned by JavaConverters.asJava + // (scala.collection.convert.Wrappers$IterableWrapper). val wrapperClass = scala.collection.convert.WrapAsJava.asJavaIterable(Seq(1)).getClass http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala index f6a96d8..c057de9 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/FileShuffleBlockResolver.scala @@ -21,7 +21,7 @@ import java.io.File import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.atomic.AtomicInteger -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.{Logging, SparkConf, SparkEnv} import org.apache.spark.executor.ShuffleWriteMetrics @@ -210,11 +210,13 @@ private[spark] class FileShuffleBlockResolver(conf: SparkConf) shuffleStates.get(shuffleId) match { case Some(state) => if (consolidateShuffleFiles) { - for (fileGroup <- state.allFileGroups; file <- fileGroup.files) { + for (fileGroup <- state.allFileGroups.asScala; + file <- fileGroup.files) { file.delete() } } else { - for (mapId <- state.completedMapTasks; reduceId <- 0 until state.numBuckets) { + for (mapId <- state.completedMapTasks.asScala; + reduceId <- 0 until state.numBuckets) { val blockId = new ShuffleBlockId(shuffleId, mapId, reduceId) blockManager.diskBlockManager.getFile(blockId).delete() } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala index 6fec524..7db6035 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala @@ -21,7 +21,7 @@ import java.util.{HashMap => JHashMap} import scala.collection.immutable.HashSet import scala.collection.mutable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import org.apache.spark.rpc.{RpcEndpointRef, RpcEnv, RpcCallContext, ThreadSafeRpcEndpoint} @@ -133,7 +133,7 @@ class BlockManagerMasterEndpoint( // Find all blocks for the given RDD, remove the block from both blockLocations and // the blockManagerInfo that is tracking the blocks. - val blocks = blockLocations.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) + val blocks = blockLocations.asScala.keys.flatMap(_.asRDDId).filter(_.rddId == rddId) blocks.foreach { blockId => val bms: mutable.HashSet[BlockManagerId] = blockLocations.get(blockId) bms.foreach(bm => blockManagerInfo.get(bm).foreach(_.removeBlock(blockId))) @@ -242,7 +242,7 @@ class BlockManagerMasterEndpoint( private def storageStatus: Array[StorageStatus] = { blockManagerInfo.map { case (blockManagerId, info) => - new StorageStatus(blockManagerId, info.maxMem, info.blocks) + new StorageStatus(blockManagerId, info.maxMem, info.blocks.asScala) }.toArray } @@ -292,7 +292,7 @@ class BlockManagerMasterEndpoint( if (askSlaves) { info.slaveEndpoint.ask[Seq[BlockId]](getMatchingBlockIds) } else { - Future { info.blocks.keys.filter(filter).toSeq } + Future { info.blocks.asScala.keys.filter(filter).toSeq } } future } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala index 78e7ddc..1738258 100644 --- a/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/AkkaUtils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util -import scala.collection.JavaConversions.mapAsJavaMap +import scala.collection.JavaConverters._ import akka.actor.{ActorRef, ActorSystem, ExtendedActorSystem} import akka.pattern.ask @@ -92,7 +92,7 @@ private[spark] object AkkaUtils extends Logging { val akkaSslConfig = securityManager.akkaSSLOptions.createAkkaConfig .getOrElse(ConfigFactory.empty()) - val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap[String, String]) + val akkaConf = ConfigFactory.parseMap(conf.getAkkaConf.toMap.asJava) .withFallback(akkaSslConfig).withFallback(ConfigFactory.parseString( s""" |akka.daemonic = on http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/ListenerBus.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala index a725767..13cb516 100644 --- a/core/src/main/scala/org/apache/spark/util/ListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/util/ListenerBus.scala @@ -19,12 +19,11 @@ package org.apache.spark.util import java.util.concurrent.CopyOnWriteArrayList -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import scala.util.control.NonFatal import org.apache.spark.Logging -import org.apache.spark.scheduler.SparkListener /** * An event bus which posts events to its listeners. @@ -46,7 +45,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { * `postToAll` in the same thread for all events. */ final def postToAll(event: E): Unit = { - // JavaConversions will create a JIterableWrapper if we use some Scala collection functions. + // JavaConverters can create a JIterableWrapper if we use asScala. // However, this method will be called frequently. To avoid the wrapper cost, here ewe use // Java Iterator directly. val iter = listeners.iterator @@ -69,7 +68,7 @@ private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging { private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = { val c = implicitly[ClassTag[T]].runtimeClass - listeners.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq + listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala index 169489d..a1c3321 100644 --- a/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala +++ b/core/src/main/scala/org/apache/spark/util/MutableURLClassLoader.scala @@ -21,8 +21,6 @@ import java.net.{URLClassLoader, URL} import java.util.Enumeration import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions._ - /** * URL class loader that exposes the `addURL` and `getURLs` methods in URLClassLoader. */ http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala index 8de75ba..d7e5143 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashMap.scala @@ -21,7 +21,8 @@ import java.util.Set import java.util.Map.Entry import java.util.concurrent.ConcurrentHashMap -import scala.collection.{JavaConversions, mutable} +import scala.collection.JavaConverters._ +import scala.collection.mutable import org.apache.spark.Logging @@ -50,8 +51,7 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa } def iterator: Iterator[(A, B)] = { - val jIterator = getEntrySet.iterator - JavaConversions.asScalaIterator(jIterator).map(kv => (kv.getKey, kv.getValue.value)) + getEntrySet.iterator.asScala.map(kv => (kv.getKey, kv.getValue.value)) } def getEntrySet: Set[Entry[A, TimeStampedValue[B]]] = internalMap.entrySet @@ -90,9 +90,7 @@ private[spark] class TimeStampedHashMap[A, B](updateTimeStampOnGet: Boolean = fa } override def filter(p: ((A, B)) => Boolean): mutable.Map[A, B] = { - JavaConversions.mapAsScalaConcurrentMap(internalMap) - .map { case (k, TimeStampedValue(v, t)) => (k, v) } - .filter(p) + internalMap.asScala.map { case (k, TimeStampedValue(v, t)) => (k, v) }.filter(p) } override def empty: mutable.Map[A, B] = new TimeStampedHashMap[A, B]() http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala index 7cd8f28..65efeb1 100644 --- a/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/TimeStampedHashSet.scala @@ -19,7 +19,7 @@ package org.apache.spark.util import java.util.concurrent.ConcurrentHashMap -import scala.collection.JavaConversions +import scala.collection.JavaConverters._ import scala.collection.mutable.Set private[spark] class TimeStampedHashSet[A] extends Set[A] { @@ -31,7 +31,7 @@ private[spark] class TimeStampedHashSet[A] extends Set[A] { def iterator: Iterator[A] = { val jIterator = internalMap.entrySet().iterator() - JavaConversions.asScalaIterator(jIterator).map(_.getKey) + jIterator.asScala.map(_.getKey) } override def + (elem: A): Set[A] = { http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/Utils.scala b/core/src/main/scala/org/apache/spark/util/Utils.scala index 8313312..2bab4af 100644 --- a/core/src/main/scala/org/apache/spark/util/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/Utils.scala @@ -25,7 +25,7 @@ import java.util.{Properties, Locale, Random, UUID} import java.util.concurrent._ import javax.net.ssl.HttpsURLConnection -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.Map import scala.collection.mutable.ArrayBuffer import scala.io.Source @@ -748,12 +748,12 @@ private[spark] object Utils extends Logging { // getNetworkInterfaces returns ifs in reverse order compared to ifconfig output order // on unix-like system. On windows, it returns in index order. // It's more proper to pick ip address following system output order. - val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.toList + val activeNetworkIFs = NetworkInterface.getNetworkInterfaces.asScala.toSeq val reOrderedNetworkIFs = if (isWindows) activeNetworkIFs else activeNetworkIFs.reverse for (ni <- reOrderedNetworkIFs) { - val addresses = ni.getInetAddresses.toList - .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress) + val addresses = ni.getInetAddresses.asScala + .filterNot(addr => addr.isLinkLocalAddress || addr.isLoopbackAddress).toSeq if (addresses.nonEmpty) { val addr = addresses.find(_.isInstanceOf[Inet4Address]).getOrElse(addresses.head) // because of Inet6Address.toHostName may add interface at the end if it knows about it @@ -1498,10 +1498,8 @@ private[spark] object Utils extends Logging { * properties which have been set explicitly, as well as those for which only a default value * has been defined. */ def getSystemProperties: Map[String, String] = { - val sysProps = for (key <- System.getProperties.stringPropertyNames()) yield - (key, System.getProperty(key)) - - sysProps.toMap + System.getProperties.stringPropertyNames().asScala + .map(key => (key, System.getProperty(key))).toMap } /** @@ -1812,7 +1810,8 @@ private[spark] object Utils extends Logging { try { val properties = new Properties() properties.load(inReader) - properties.stringPropertyNames().map(k => (k, properties(k).trim)).toMap + properties.stringPropertyNames().asScala.map( + k => (k, properties.getProperty(k).trim)).toMap } catch { case e: IOException => throw new SparkException(s"Failed when loading Spark properties from $filename", e) @@ -1941,7 +1940,8 @@ private[spark] object Utils extends Logging { return true } isBindCollision(e.getCause) - case e: MultiException => e.getThrowables.exists(isBindCollision) + case e: MultiException => + e.getThrowables.asScala.exists(isBindCollision) case e: Exception => isBindCollision(e.getCause) case _ => false } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/main/scala/org/apache/spark/util/collection/Utils.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala index bdbca00..4939b60 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/Utils.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/Utils.scala @@ -17,7 +17,7 @@ package org.apache.spark.util.collection -import scala.collection.JavaConversions.{collectionAsScalaIterable, asJavaIterator} +import scala.collection.JavaConverters._ import com.google.common.collect.{Ordering => GuavaOrdering} @@ -34,6 +34,6 @@ private[spark] object Utils { val ordering = new GuavaOrdering[T] { override def compare(l: T, r: T): Int = ord.compare(l, r) } - collectionAsScalaIterable(ordering.leastOf(asJavaIterator(input), num)).iterator + ordering.leastOf(input.asJava, num).iterator.asScala } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/java/org/apache/spark/JavaAPISuite.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java index ffe4b4b..ebd3d61 100644 --- a/core/src/test/java/org/apache/spark/JavaAPISuite.java +++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java @@ -24,10 +24,10 @@ import java.net.URI; import java.util.*; import java.util.concurrent.*; -import scala.collection.JavaConversions; import scala.Tuple2; import scala.Tuple3; import scala.Tuple4; +import scala.collection.JavaConverters; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; @@ -1473,7 +1473,9 @@ public class JavaAPISuite implements Serializable { Assert.assertEquals(expected, results); Partitioner defaultPartitioner = Partitioner.defaultPartitioner( - combinedRDD.rdd(), JavaConversions.asScalaBuffer(Lists.<RDD<?>>newArrayList())); + combinedRDD.rdd(), + JavaConverters.collectionAsScalaIterableConverter( + Collections.<RDD<?>>emptyList()).asScala().toSeq()); combinedRDD = originalRDD.keyBy(keyFunction) .combineByKey( createCombinerFunction, http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/SparkConfSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala index 90cb7da..ff9a92c 100644 --- a/core/src/test/scala/org/apache/spark/SparkConfSuite.scala +++ b/core/src/test/scala/org/apache/spark/SparkConfSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark import java.util.concurrent.{TimeUnit, Executors} +import scala.collection.JavaConverters._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.util.{Try, Random} @@ -148,7 +149,6 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } test("Thread safeness - SPARK-5425") { - import scala.collection.JavaConversions._ val executor = Executors.newSingleThreadScheduledExecutor() val sf = executor.scheduleAtFixedRate(new Runnable { override def run(): Unit = @@ -163,8 +163,9 @@ class SparkConfSuite extends SparkFunSuite with LocalSparkContext with ResetSyst } } finally { executor.shutdownNow() - for (key <- System.getProperties.stringPropertyNames() if key.startsWith("spark.5425.")) - System.getProperties.remove(key) + val sysProps = System.getProperties + for (key <- sysProps.stringPropertyNames().asScala if key.startsWith("spark.5425.")) + sysProps.remove(key) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala index cbd2aee..86eb41d 100644 --- a/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/LogUrlsStandaloneSuite.scala @@ -19,7 +19,6 @@ package org.apache.spark.deploy import java.net.URL -import scala.collection.JavaConversions._ import scala.collection.mutable import scala.io.Source http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala index 47a6408..1ed4bae 100644 --- a/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala +++ b/core/src/test/scala/org/apache/spark/deploy/RPackageUtilsSuite.scala @@ -21,14 +21,14 @@ import java.io.{PrintStream, OutputStream, File} import java.net.URI import java.util.jar.Attributes.Name import java.util.jar.{JarFile, Manifest} -import java.util.zip.{ZipEntry, ZipFile} +import java.util.zip.ZipFile -import org.scalatest.BeforeAndAfterEach -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer import com.google.common.io.Files import org.apache.commons.io.FileUtils +import org.scalatest.BeforeAndAfterEach import org.apache.spark.SparkFunSuite import org.apache.spark.api.r.RUtils @@ -142,7 +142,7 @@ class RPackageUtilsSuite extends SparkFunSuite with BeforeAndAfterEach { IvyTestUtils.writeFile(fakePackageDir, "DESCRIPTION", "abc") val finalZip = RPackageUtils.zipRLibraries(tempDir, "sparkr.zip") assert(finalZip.exists()) - val entries = new ZipFile(finalZip).entries().toSeq.map(_.getName) + val entries = new ZipFile(finalZip).entries().asScala.map(_.getName).toSeq assert(entries.contains("/test.R")) assert(entries.contains("/SparkR/abc.R")) assert(entries.contains("/SparkR/DESCRIPTION")) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala index bed6f3e..98664dc 100644 --- a/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala +++ b/core/src/test/scala/org/apache/spark/deploy/worker/ExecutorRunnerTest.scala @@ -19,8 +19,6 @@ package org.apache.spark.deploy.worker import java.io.File -import scala.collection.JavaConversions._ - import org.apache.spark.deploy.{ApplicationDescription, Command, ExecutorState} import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite} @@ -36,6 +34,7 @@ class ExecutorRunnerTest extends SparkFunSuite { ExecutorState.RUNNING) val builder = CommandUtils.buildProcessBuilder( appDesc.command, new SecurityManager(conf), 512, sparkHome, er.substituteVariables) - assert(builder.command().last === appId) + val builderCommand = builder.command() + assert(builderCommand.get(builderCommand.size() - 1) === appId) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala index 730535e..a9652d7 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/SparkListenerSuite.scala @@ -20,7 +20,7 @@ package org.apache.spark.scheduler import java.util.concurrent.Semaphore import scala.collection.mutable -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.scalatest.Matchers @@ -365,10 +365,9 @@ class SparkListenerSuite extends SparkFunSuite with LocalSparkContext with Match .set("spark.extraListeners", classOf[ListenerThatAcceptsSparkConf].getName + "," + classOf[BasicJobCounter].getName) sc = new SparkContext(conf) - sc.listenerBus.listeners.collect { case x: BasicJobCounter => x}.size should be (1) - sc.listenerBus.listeners.collect { - case x: ListenerThatAcceptsSparkConf => x - }.size should be (1) + sc.listenerBus.listeners.asScala.count(_.isInstanceOf[BasicJobCounter]) should be (1) + sc.listenerBus.listeners.asScala + .count(_.isInstanceOf[ListenerThatAcceptsSparkConf]) should be (1) } /** http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala index 5ed30f6..319b317 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/cluster/mesos/MesosSchedulerBackendSuite.scala @@ -18,10 +18,11 @@ package org.apache.spark.scheduler.cluster.mesos import java.nio.ByteBuffer -import java.util +import java.util.Arrays +import java.util.Collection import java.util.Collections -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -61,7 +62,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val mesosSchedulerBackend = new MesosSchedulerBackend(taskScheduler, sc, "master") - val resources = List( + val resources = Arrays.asList( mesosSchedulerBackend.createResource("cpus", 4), mesosSchedulerBackend.createResource("mem", 1024)) // uri is null. @@ -98,7 +99,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi val backend = new MesosSchedulerBackend(taskScheduler, sc, "master") val (execInfo, _) = backend.createExecutorInfo( - List(backend.createResource("cpus", 4)), "mockExecutor") + Arrays.asList(backend.createResource("cpus", 4)), "mockExecutor") assert(execInfo.getContainer.getDocker.getImage.equals("spark/mock")) val portmaps = execInfo.getContainer.getDocker.getPortMappingsList assert(portmaps.get(0).getHostPort.equals(80)) @@ -179,7 +180,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(2) - val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) when( driver.launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), @@ -279,7 +280,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi when(taskScheduler.resourceOffers(expectedWorkerOffers)).thenReturn(Seq(Seq(taskDesc))) when(taskScheduler.CPUS_PER_TASK).thenReturn(1) - val capture = ArgumentCaptor.forClass(classOf[util.Collection[TaskInfo]]) + val capture = ArgumentCaptor.forClass(classOf[Collection[TaskInfo]]) when( driver.launchTasks( Matchers.eq(Collections.singleton(mesosOffers.get(0).getId)), @@ -304,7 +305,7 @@ class MesosSchedulerBackendSuite extends SparkFunSuite with LocalSparkContext wi assert(cpusDev.getName.equals("cpus")) assert(cpusDev.getScalar.getValue.equals(1.0)) assert(cpusDev.getRole.equals("dev")) - val executorResources = taskInfo.getExecutor.getResourcesList + val executorResources = taskInfo.getExecutor.getResourcesList.asScala assert(executorResources.exists { r => r.getName.equals("mem") && r.getScalar.getValue.equals(484.0) && r.getRole.equals("prod") }) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala index 23a1fdb..8d1c9d1 100644 --- a/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala +++ b/core/src/test/scala/org/apache/spark/serializer/KryoSerializerSuite.scala @@ -19,6 +19,7 @@ package org.apache.spark.serializer import java.io.{ByteArrayInputStream, ByteArrayOutputStream} +import scala.collection.JavaConverters._ import scala.collection.mutable import scala.reflect.ClassTag @@ -173,7 +174,7 @@ class KryoSerializerSuite extends SparkFunSuite with SharedSparkContext { test("asJavaIterable") { // Serialize a collection wrapped by asJavaIterable val ser = new KryoSerializer(conf).newInstance() - val a = ser.serialize(scala.collection.convert.WrapAsJava.asJavaIterable(Seq(12345))) + val a = ser.serialize(Seq(12345).asJava) val b = ser.deserialize[java.lang.Iterable[Int]](a) assert(b.iterator().next() === 12345) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala index 69888b2..22e30ec 100644 --- a/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala +++ b/core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala @@ -21,7 +21,6 @@ import java.net.{HttpURLConnection, URL} import javax.servlet.http.{HttpServletResponse, HttpServletRequest} import scala.io.Source -import scala.collection.JavaConversions._ import scala.xml.Node import com.gargoylesoftware.htmlunit.DefaultCssErrorHandler @@ -341,15 +340,15 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B // The completed jobs table should have two rows. The first row will be the most recent job: val firstRow = find(cssSelector("tbody tr")).get.underlying val firstRowColumns = firstRow.findElements(By.tagName("td")) - firstRowColumns(0).getText should be ("1") - firstRowColumns(4).getText should be ("1/1 (2 skipped)") - firstRowColumns(5).getText should be ("8/8 (16 skipped)") + firstRowColumns.get(0).getText should be ("1") + firstRowColumns.get(4).getText should be ("1/1 (2 skipped)") + firstRowColumns.get(5).getText should be ("8/8 (16 skipped)") // The second row is the first run of the job, where nothing was skipped: val secondRow = findAll(cssSelector("tbody tr")).toSeq(1).underlying val secondRowColumns = secondRow.findElements(By.tagName("td")) - secondRowColumns(0).getText should be ("0") - secondRowColumns(4).getText should be ("3/3") - secondRowColumns(5).getText should be ("24/24") + secondRowColumns.get(0).getText should be ("0") + secondRowColumns.get(4).getText should be ("3/3") + secondRowColumns.get(5).getText should be ("24/24") } } } @@ -502,8 +501,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B for { (row, idx) <- rows.zipWithIndex columns = row.findElements(By.tagName("td")) - id = columns(0).getText() - description = columns(1).getText() + id = columns.get(0).getText() + description = columns.get(1).getText() } { id should be (expJobInfo(idx)._1) description should include (expJobInfo(idx)._2) @@ -547,8 +546,8 @@ class UISeleniumSuite extends SparkFunSuite with WebBrowser with Matchers with B for { (row, idx) <- rows.zipWithIndex columns = row.findElements(By.tagName("td")) - id = columns(0).getText() - description = columns(1).getText() + id = columns.get(0).getText() + description = columns.get(1).getText() } { id should be (expStageInfo(idx)._1) description should include (expStageInfo(idx)._2) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala index 36832f5..fa07c1e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraCQLTest.scala @@ -19,10 +19,7 @@ package org.apache.spark.examples import java.nio.ByteBuffer - -import scala.collection.JavaConversions._ -import scala.collection.mutable.ListBuffer -import scala.collection.immutable.Map +import java.util.Collections import org.apache.cassandra.hadoop.ConfigHelper import org.apache.cassandra.hadoop.cql3.CqlPagingInputFormat @@ -32,7 +29,6 @@ import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ /* @@ -121,12 +117,9 @@ object CassandraCQLTest { val casoutputCF = aggregatedRDD.map { case (productId, saleCount) => { - val outColFamKey = Map("prod_id" -> ByteBufferUtil.bytes(productId)) - val outKey: java.util.Map[String, ByteBuffer] = outColFamKey - var outColFamVal = new ListBuffer[ByteBuffer] - outColFamVal += ByteBufferUtil.bytes(saleCount) - val outVal: java.util.List[ByteBuffer] = outColFamVal - (outKey, outVal) + val outKey = Collections.singletonMap("prod_id", ByteBufferUtil.bytes(productId)) + val outVal = Collections.singletonList(ByteBufferUtil.bytes(saleCount)) + (outKey, outVal) } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala index 96ef3e1..2e56d24 100644 --- a/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/CassandraTest.scala @@ -19,10 +19,9 @@ package org.apache.spark.examples import java.nio.ByteBuffer +import java.util.Arrays import java.util.SortedMap -import scala.collection.JavaConversions._ - import org.apache.cassandra.db.IColumn import org.apache.cassandra.hadoop.ColumnFamilyOutputFormat import org.apache.cassandra.hadoop.ConfigHelper @@ -32,7 +31,6 @@ import org.apache.cassandra.utils.ByteBufferUtil import org.apache.hadoop.mapreduce.Job import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.SparkContext._ /* * This example demonstrates using Spark with Cassandra with the New Hadoop API and Cassandra @@ -118,7 +116,7 @@ object CassandraTest { val outputkey = ByteBufferUtil.bytes(word + "-COUNT-" + System.currentTimeMillis) - val mutations: java.util.List[Mutation] = new Mutation() :: new Mutation() :: Nil + val mutations = Arrays.asList(new Mutation(), new Mutation()) mutations.get(0).setColumn_or_supercolumn(new ColumnOrSuperColumn()) mutations.get(0).column_or_supercolumn.setColumn(colWord) mutations.get(1).setColumn_or_supercolumn(new ColumnOrSuperColumn()) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala index c42df2b..bec61f3 100644 --- a/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala +++ b/examples/src/main/scala/org/apache/spark/examples/DriverSubmissionTest.scala @@ -18,7 +18,7 @@ // scalastyle:off println package org.apache.spark.examples -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.util.Utils @@ -36,10 +36,10 @@ object DriverSubmissionTest { val properties = Utils.getSystemProperties println("Environment variables containing SPARK_TEST:") - env.filter{case (k, v) => k.contains("SPARK_TEST")}.foreach(println) + env.asScala.filter { case (k, _) => k.contains("SPARK_TEST")}.foreach(println) println("System properties containing spark.test:") - properties.filter{case (k, v) => k.toString.contains("spark.test")}.foreach(println) + properties.filter { case (k, _) => k.toString.contains("spark.test") }.foreach(println) for (i <- 1 until numSecondsToSleep) { println(s"Alive for $i out of $numSecondsToSleep seconds") http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala index 3ebb112..805184e 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/AvroConverters.scala @@ -19,7 +19,7 @@ package org.apache.spark.examples.pythonconverters import java.util.{Collection => JCollection, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.avro.generic.{GenericFixed, IndexedRecord} import org.apache.avro.mapred.AvroWrapper @@ -58,7 +58,7 @@ object AvroConversionUtil extends Serializable { val map = new java.util.HashMap[String, Any] obj match { case record: IndexedRecord => - record.getSchema.getFields.zipWithIndex.foreach { case (f, i) => + record.getSchema.getFields.asScala.zipWithIndex.foreach { case (f, i) => map.put(f.name, fromAvro(record.get(i), f.schema)) } case other => throw new SparkException( @@ -68,9 +68,9 @@ object AvroConversionUtil extends Serializable { } def unpackMap(obj: Any, schema: Schema): JMap[String, Any] = { - obj.asInstanceOf[JMap[_, _]].map { case (key, value) => + obj.asInstanceOf[JMap[_, _]].asScala.map { case (key, value) => (key.toString, fromAvro(value, schema.getValueType)) - } + }.asJava } def unpackFixed(obj: Any, schema: Schema): Array[Byte] = { @@ -91,17 +91,17 @@ object AvroConversionUtil extends Serializable { def unpackArray(obj: Any, schema: Schema): JCollection[Any] = obj match { case c: JCollection[_] => - c.map(fromAvro(_, schema.getElementType)) + c.asScala.map(fromAvro(_, schema.getElementType)).toSeq.asJava case arr: Array[_] if arr.getClass.getComponentType.isPrimitive => - arr.toSeq + arr.toSeq.asJava.asInstanceOf[JCollection[Any]] case arr: Array[_] => - arr.map(fromAvro(_, schema.getElementType)).toSeq + arr.map(fromAvro(_, schema.getElementType)).toSeq.asJava case other => throw new SparkException( s"Unknown ARRAY type ${other.getClass.getName}") } def unpackUnion(obj: Any, schema: Schema): Any = { - schema.getTypes.toList match { + schema.getTypes.asScala.toList match { case List(s) => fromAvro(obj, s) case List(n, s) if n.getType == NULL => fromAvro(obj, s) case List(s, n) if n.getType == NULL => fromAvro(obj, s) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala index 83feb57..00ce47a 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/CassandraConverters.scala @@ -17,11 +17,13 @@ package org.apache.spark.examples.pythonconverters -import org.apache.spark.api.python.Converter import java.nio.ByteBuffer + +import scala.collection.JavaConverters._ + import org.apache.cassandra.utils.ByteBufferUtil -import collection.JavaConversions._ +import org.apache.spark.api.python.Converter /** * Implementation of [[org.apache.spark.api.python.Converter]] that converts Cassandra @@ -30,7 +32,7 @@ import collection.JavaConversions._ class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int]] { override def convert(obj: Any): java.util.Map[String, Int] = { val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.toInt(bb))) + result.asScala.mapValues(ByteBufferUtil.toInt).asJava } } @@ -41,7 +43,7 @@ class CassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, Int] class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, String]] { override def convert(obj: Any): java.util.Map[String, String] = { val result = obj.asInstanceOf[java.util.Map[String, ByteBuffer]] - mapAsJavaMap(result.mapValues(bb => ByteBufferUtil.string(bb))) + result.asScala.mapValues(ByteBufferUtil.string).asJava } } @@ -52,7 +54,7 @@ class CassandraCQLValueConverter extends Converter[Any, java.util.Map[String, St class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, ByteBuffer]] { override def convert(obj: Any): java.util.Map[String, ByteBuffer] = { val input = obj.asInstanceOf[java.util.Map[String, Int]] - mapAsJavaMap(input.mapValues(i => ByteBufferUtil.bytes(i))) + input.asScala.mapValues(ByteBufferUtil.bytes).asJava } } @@ -63,6 +65,6 @@ class ToCassandraCQLKeyConverter extends Converter[Any, java.util.Map[String, By class ToCassandraCQLValueConverter extends Converter[Any, java.util.List[ByteBuffer]] { override def convert(obj: Any): java.util.List[ByteBuffer] = { val input = obj.asInstanceOf[java.util.List[String]] - seqAsJavaList(input.map(s => ByteBufferUtil.bytes(s))) + input.asScala.map(ByteBufferUtil.bytes).asJava } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala ---------------------------------------------------------------------- diff --git a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala index 90d48a6..0a25ee7 100644 --- a/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala +++ b/examples/src/main/scala/org/apache/spark/examples/pythonconverters/HBaseConverters.scala @@ -17,7 +17,7 @@ package org.apache.spark.examples.pythonconverters -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.util.parsing.json.JSONObject import org.apache.spark.api.python.Converter @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.CellUtil */ class HBaseResultToStringConverter extends Converter[Any, String] { override def convert(obj: Any): String = { - import collection.JavaConverters._ val result = obj.asInstanceOf[Result] val output = result.listCells.asScala.map(cell => Map( @@ -77,7 +76,7 @@ class StringToImmutableBytesWritableConverter extends Converter[Any, ImmutableBy */ class StringListToPutConverter extends Converter[Any, Put] { override def convert(obj: Any): Put = { - val output = obj.asInstanceOf[java.util.ArrayList[String]].map(Bytes.toBytes(_)).toArray + val output = obj.asInstanceOf[java.util.ArrayList[String]].asScala.map(Bytes.toBytes).toArray val put = new Put(output(0)) put.add(output(1), output(2), output(3)) } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala ---------------------------------------------------------------------- diff --git a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala index fa43629..d265470 100644 --- a/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala +++ b/external/flume-sink/src/test/scala/org/apache/spark/streaming/flume/sink/SparkSinkSuite.scala @@ -20,7 +20,7 @@ import java.net.InetSocketAddress import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.{TimeUnit, CountDownLatch, Executors} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} import scala.util.{Failure, Success} @@ -166,7 +166,7 @@ class SparkSinkSuite extends FunSuite { channelContext.put("capacity", channelCapacity.toString) channelContext.put("transactionCapacity", 1000.toString) channelContext.put("keep-alive", 0.toString) - channelContext.putAll(overrides) + channelContext.putAll(overrides.asJava) channel.setName(scala.util.Random.nextString(10)) channel.configure(channelContext) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala index 65c49c1..48df27b 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/EventTransformer.scala @@ -19,7 +19,7 @@ package org.apache.spark.streaming.flume import java.io.{ObjectOutput, ObjectInput} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.util.Utils import org.apache.spark.Logging @@ -60,7 +60,7 @@ private[streaming] object EventTransformer extends Logging { out.write(body) val numHeaders = headers.size() out.writeInt(numHeaders) - for ((k, v) <- headers) { + for ((k, v) <- headers.asScala) { val keyBuff = Utils.serialize(k.toString) out.writeInt(keyBuff.length) out.write(keyBuff) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala index 88cc2aa..b9d4e76 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeBatchFetcher.scala @@ -16,7 +16,6 @@ */ package org.apache.spark.streaming.flume -import scala.collection.JavaConversions._ import scala.collection.mutable.ArrayBuffer import com.google.common.base.Throwables @@ -155,7 +154,7 @@ private[flume] class FlumeBatchFetcher(receiver: FlumePollingReceiver) extends R val buffer = new ArrayBuffer[SparkFlumeEvent](events.size()) var j = 0 while (j < events.size()) { - val event = events(j) + val event = events.get(j) val sparkFlumeEvent = new SparkFlumeEvent() sparkFlumeEvent.event.setBody(event.getBody) sparkFlumeEvent.event.setHeaders(event.getHeaders) http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala index 1e32a36..2bf99cb 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeInputDStream.scala @@ -22,7 +22,7 @@ import java.io.{ObjectInput, ObjectOutput, Externalizable} import java.nio.ByteBuffer import java.util.concurrent.Executors -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import org.apache.flume.source.avro.AvroSourceProtocol @@ -99,7 +99,7 @@ class SparkFlumeEvent() extends Externalizable { val numHeaders = event.getHeaders.size() out.writeInt(numHeaders) - for ((k, v) <- event.getHeaders) { + for ((k, v) <- event.getHeaders.asScala) { val keyBuff = Utils.serialize(k.toString) out.writeInt(keyBuff.length) out.write(keyBuff) @@ -127,8 +127,7 @@ class FlumeEventServer(receiver : FlumeReceiver) extends AvroSourceProtocol { } override def appendBatch(events : java.util.List[AvroFlumeEvent]) : Status = { - events.foreach (event => - receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) + events.asScala.foreach(event => receiver.store(SparkFlumeEvent.fromAvroFlumeEvent(event))) Status.OK } } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala index 583e7dc..0bc4620 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumePollingInputDStream.scala @@ -20,7 +20,7 @@ package org.apache.spark.streaming.flume import java.net.InetSocketAddress import java.util.concurrent.{LinkedBlockingQueue, Executors} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import scala.reflect.ClassTag import com.google.common.util.concurrent.ThreadFactoryBuilder @@ -94,9 +94,7 @@ private[streaming] class FlumePollingReceiver( override def onStop(): Unit = { logInfo("Shutting down Flume Polling Receiver") receiverExecutor.shutdownNow() - connections.foreach(connection => { - connection.transceiver.close() - }) + connections.asScala.foreach(_.transceiver.close()) channelFactory.releaseExternalResources() } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala index 9d9c3b1..70018c8 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeTestUtils.scala @@ -19,9 +19,9 @@ package org.apache.spark.streaming.flume import java.net.{InetSocketAddress, ServerSocket} import java.nio.ByteBuffer -import java.util.{List => JList} +import java.util.Collections -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import com.google.common.base.Charsets.UTF_8 import org.apache.avro.ipc.NettyTransceiver @@ -59,13 +59,13 @@ private[flume] class FlumeTestUtils { } /** Send data to the flume receiver */ - def writeInput(input: JList[String], enableCompression: Boolean): Unit = { + def writeInput(input: Seq[String], enableCompression: Boolean): Unit = { val testAddress = new InetSocketAddress("localhost", testPort) val inputEvents = input.map { item => val event = new AvroFlumeEvent event.setBody(ByteBuffer.wrap(item.getBytes(UTF_8))) - event.setHeaders(Map[CharSequence, CharSequence]("test" -> "header")) + event.setHeaders(Collections.singletonMap("test", "header")) event } @@ -88,7 +88,7 @@ private[flume] class FlumeTestUtils { } // Send data - val status = client.appendBatch(inputEvents.toList) + val status = client.appendBatch(inputEvents.asJava) if (status != avro.Status.OK) { throw new AssertionError("Sent events unsuccessfully") } http://git-wip-us.apache.org/repos/asf/spark/blob/69c9c177/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala ---------------------------------------------------------------------- diff --git a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala index a65a9b9..c719b80 100644 --- a/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala +++ b/external/flume/src/main/scala/org/apache/spark/streaming/flume/FlumeUtils.scala @@ -21,7 +21,7 @@ import java.net.InetSocketAddress import java.io.{DataOutputStream, ByteArrayOutputStream} import java.util.{List => JList, Map => JMap} -import scala.collection.JavaConversions._ +import scala.collection.JavaConverters._ import org.apache.spark.api.java.function.PairFunction import org.apache.spark.api.python.PythonRDD @@ -268,8 +268,8 @@ private[flume] class FlumeUtilsPythonHelper { maxBatchSize: Int, parallelism: Int ): JavaPairDStream[Array[Byte], Array[Byte]] = { - assert(hosts.length == ports.length) - val addresses = hosts.zip(ports).map { + assert(hosts.size() == ports.size()) + val addresses = hosts.asScala.zip(ports.asScala).map { case (host, port) => new InetSocketAddress(host, port) } val dstream = FlumeUtils.createPollingStream( @@ -286,7 +286,7 @@ private object FlumeUtilsPythonHelper { val output = new DataOutputStream(byteStream) try { output.writeInt(map.size) - map.foreach { kv => + map.asScala.foreach { kv => PythonRDD.writeUTF(kv._1.toString, output) PythonRDD.writeUTF(kv._2.toString, output) } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org