Repository: samza
Updated Branches:
  refs/heads/master d7fc811d6 -> 9db47b861


http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
index 7e9f18a..ae6330f 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/config/KafkaConfig.scala
@@ -25,7 +25,7 @@ import java.util.regex.Pattern
 import org.apache.samza.util.Util
 import org.apache.samza.util.Logging
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import kafka.consumer.ConsumerConfig
 import java.util.{Properties, UUID}
 
@@ -102,6 +102,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getFetchMessageMaxBytesTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
+      .asScala
       .filterKeys(k => k.endsWith(".consumer.fetch.message.max.bytes"))
       .map {
         case (fetchMessageMaxBytes, fetchSizeValue) =>
@@ -116,6 +117,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
   def getAutoOffsetResetTopics(systemName: String) = {
     val subConf = config.subset("systems.%s.streams." format systemName, true)
     subConf
+      .asScala
       .filterKeys(k => k.endsWith(".consumer.auto.offset.reset"))
       .map {
         case (topicAutoOffsetReset, resetValue) =>
@@ -162,7 +164,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     kafkaChangeLogProperties.setProperty("cleanup.policy", "compact")
     kafkaChangeLogProperties.setProperty("segment.bytes", 
KafkaConfig.CHANGELOG_DEFAULT_SEGMENT_SIZE)
     kafkaChangeLogProperties.setProperty("delete.retention.ms", 
String.valueOf(new StorageConfig(config).getChangeLogDeleteRetentionInMs(name)))
-    filteredConfigs.foreach { kv => 
kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
+    filteredConfigs.asScala.foreach { kv => 
kafkaChangeLogProperties.setProperty(kv._1, kv._2) }
     kafkaChangeLogProperties
   }
 
@@ -177,7 +179,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     consumerProps.putAll(subConf)
     consumerProps.put("group.id", groupId)
     consumerProps.put("client.id", clientId)
-    consumerProps.putAll(injectedProps)
+    consumerProps.putAll(injectedProps.asJava)
     new ConsumerConfig(consumerProps)
   }
 
@@ -189,7 +191,7 @@ class KafkaConfig(config: Config) extends 
ScalaMapConfig(config) {
     val producerProps = new util.HashMap[String, Object]()
     producerProps.putAll(subConf)
     producerProps.put("client.id", clientId)
-    producerProps.putAll(injectedProps)
+    producerProps.putAll(injectedProps.asJava)
     new KafkaProducerConfig(systemName, clientId, producerProps)
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala 
b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
index 4e3b247..6dc2f82 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/config/RegExTopicGenerator.scala
@@ -24,12 +24,11 @@ import kafka.utils.ZkUtils
 import org.apache.samza.config.KafkaConfig.{ Config2Kafka, 
REGEX_RESOLVED_STREAMS }
 import org.apache.samza.SamzaException
 import org.apache.samza.util.Util
-import collection.JavaConversions._
+import collection.JavaConverters._
 import org.apache.samza.util.Logging
 import scala.collection._
 import org.apache.samza.config.TaskConfig.Config2Task
 import org.apache.samza.system.SystemStream
-import scala.util.Sorting
 
 /**
  * Dynamically determine the Kafka topics to use as input streams to the task 
via a regular expression.
@@ -80,6 +79,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging 
{
       // For each topic that matched, generate all the specified configs
       config
         .getRegexResolvedInheritedConfig(rewriterName)
+        .asScala
         .foreach(kv => keysAndValsToAdd.put("systems." + m.getSystem + 
".streams." + m.getStream + "." + kv._1, kv._2))
     }
     // Build new inputs
@@ -92,7 +92,7 @@ class RegExTopicGenerator extends ConfigRewriter with Logging 
{
       .sortWith(_ < _)
       .mkString(",")
 
-    new MapConfig((keysAndValsToAdd ++ config) += inputStreams)
+    new MapConfig(((keysAndValsToAdd ++ config.asScala) += 
inputStreams).asJava)
   }
 
   def getTopicsFromZK(rewriterName: String, config: Config): Seq[String] = {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
index 539a439..5338886 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/BrokerProxy.scala
@@ -23,7 +23,6 @@ package org.apache.samza.system.kafka
 
 import java.lang.Thread.UncaughtExceptionHandler
 import java.nio.channels.ClosedByInterruptException
-import java.util.Map.Entry
 import java.util.concurrent.{ConcurrentHashMap, CountDownLatch}
 
 import kafka.api._
@@ -35,9 +34,8 @@ import org.apache.samza.util.ExponentialSleepStrategy
 import org.apache.samza.util.Logging
 import org.apache.samza.util.ThreadNamePrefix.SAMZA_THREAD_NAME_PREFIX
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.concurrent
-import scala.collection.mutable
 import org.apache.samza.util.KafkaUtil
 
 /**
@@ -71,7 +69,7 @@ class BrokerProxy(
   val sleepMSWhileNoTopicPartitions = 100
 
   /** What's the next offset for a particular partition? **/
-  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new 
ConcurrentHashMap[TopicAndPartition, Long]()
+  val nextOffsets:concurrent.Map[TopicAndPartition, Long] = new 
ConcurrentHashMap[TopicAndPartition, Long]().asScala
 
   /** Block on the first call to get message if the fetcher has not yet 
returned its initial results **/
   // TODO: It should be sufficient to just use the count down latch and await 
on it for each of the calls, but
@@ -95,7 +93,7 @@ class BrokerProxy(
   def addTopicPartition(tp: TopicAndPartition, nextOffset: Option[String]) = {
     debug("Adding new topic and partition %s to queue for %s" format (tp, 
host))
 
-    if (nextOffsets.containsKey(tp)) {
+    if (nextOffsets.asJava.containsKey(tp)) {
       toss("Already consuming TopicPartition %s" format tp)
     }
 
@@ -113,13 +111,13 @@ class BrokerProxy(
 
     nextOffsets += tp -> offset
 
-    metrics.topicPartitions(host, port).set(nextOffsets.size)
+    metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
   }
 
   def removeTopicPartition(tp: TopicAndPartition) = {
-    if (nextOffsets.containsKey(tp)) {
+    if (nextOffsets.asJava.containsKey(tp)) {
       val offset = nextOffsets.remove(tp)
-      metrics.topicPartitions(host, port).set(nextOffsets.size)
+      metrics.topicPartitions.get((host, port)).set(nextOffsets.size)
       debug("Removed %s" format tp)
       offset
     } else {
@@ -136,7 +134,7 @@ class BrokerProxy(
         (new ExponentialSleepStrategy).run(
           loop => {
             if (reconnect) {
-              metrics.reconnects(host, port).inc
+              metrics.reconnects.get((host, port)).inc
               simpleConsumer.close()
               simpleConsumer = createSimpleConsumer()
             }
@@ -178,23 +176,23 @@ class BrokerProxy(
     val topicAndPartitionsToFetch = 
nextOffsets.filterKeys(messageSink.needsMoreMessages(_)).toList
 
     if (topicAndPartitionsToFetch.size > 0) {
-      metrics.brokerReads(host, port).inc
+      metrics.brokerReads.get((host, port)).inc
       val response: FetchResponse = 
simpleConsumer.defaultFetch(topicAndPartitionsToFetch: _*)
       firstCall = false
       firstCallBarrier.countDown()
 
       // Split response into errors and non errors, processing the errors first
-      val (nonErrorResponses, errorResponses) = 
response.data.entrySet().partition(_.getValue.error == ErrorMapping.NoError)
+      val (nonErrorResponses, errorResponses) = 
response.data.toSet.partition(_._2.error == ErrorMapping.NoError)
 
       handleErrors(errorResponses, response)
 
-      nonErrorResponses.foreach { nonError => 
moveMessagesToTheirQueue(nonError.getKey, nonError.getValue) }
+      nonErrorResponses.foreach { case (tp, data) => 
moveMessagesToTheirQueue(tp, data) }
     } else {
       refreshLatencyMetrics
 
       debug("No topic/partitions need to be fetched for %s:%s right now. 
Sleeping %sms." format (host, port, sleepMSWhileNoTopicPartitions))
 
-      metrics.brokerSkippedFetchRequests(host, port).inc
+      metrics.brokerSkippedFetchRequests.get((host, port)).inc
 
       Thread.sleep(sleepMSWhileNoTopicPartitions)
     }
@@ -221,7 +219,7 @@ class BrokerProxy(
     immutableNextOffsetsCopy.keySet.foreach(abdicate(_))
   }
 
-  def handleErrors(errorResponses: mutable.Set[Entry[TopicAndPartition, 
FetchResponsePartitionData]], response:FetchResponse) = {
+  def handleErrors(errorResponses: Set[(TopicAndPartition, 
FetchResponsePartitionData)], response:FetchResponse) = {
     // FetchResponse should really return Option and a list of the errors so 
we don't have to find them ourselves
     case class Error(tp: TopicAndPartition, code: Short, exception: Throwable)
 
@@ -229,10 +227,10 @@ class BrokerProxy(
 
     // Convert FetchResponse into easier-to-work-with Errors
     val errors = for (
-      error <- errorResponses;
-      errorCode <- Option(response.errorCode(error.getKey.topic, 
error.getKey.partition)); // Scala's being cranky about referring to 
error.getKey values...
+      (topicAndPartition, responseData) <- errorResponses;
+      errorCode <- Option(response.errorCode(topicAndPartition.topic, 
topicAndPartition.partition)); // Scala's being cranky about referring to 
error.getKey values...
       exception <- Option(ErrorMapping.exceptionFor(errorCode))
-    ) yield new Error(error.getKey, errorCode, exception)
+    ) yield new Error(topicAndPartition, errorCode, exception)
 
     val (notLeaderOrUnknownTopic, otherErrors) = errors.partition { case (e) 
=> e.code == ErrorMapping.NotLeaderForPartitionCode || e.code == 
ErrorMapping.UnknownTopicOrPartitionCode }
     val (offsetOutOfRangeErrors, remainingErrors) = 
otherErrors.partition(_.code == ErrorMapping.OffsetOutOfRangeCode)
@@ -274,10 +272,10 @@ class BrokerProxy(
       nextOffset = message.nextOffset
 
       val bytesSize = message.message.payloadSize + message.message.keySize
-      metrics.reads(tp).inc
-      metrics.bytesRead(tp).inc(bytesSize)
-      metrics.brokerBytesRead(host, port).inc(bytesSize)
-      metrics.offsets(tp).set(nextOffset)
+      metrics.reads.get(tp).inc
+      metrics.bytesRead.get(tp).inc(bytesSize)
+      metrics.brokerBytesRead.get((host, port)).inc(bytesSize)
+      metrics.offsets.get(tp).set(nextOffset)
     }
 
     nextOffsets.replace(tp, nextOffset) // use replace rather than put in case 
this tp was removed while we were fetching.
@@ -285,8 +283,8 @@ class BrokerProxy(
     // Update high water mark
     val hw = data.hw
     if (hw >= 0) {
-      metrics.highWatermark(tp).set(hw)
-      metrics.lag(tp).set(hw - nextOffset)
+      metrics.highWatermark.get(tp).set(hw)
+      metrics.lag.get(tp).set(hw - nextOffset)
     } else {
       debug("Got a high water mark less than 0 (%d) for %s, so skipping." 
format (hw, tp))
     }
@@ -327,10 +325,10 @@ class BrokerProxy(
         if (latestOffset >= 0) {
           // only update the registered topicAndpartitions
           if(metrics.highWatermark.containsKey(topicAndPartition)) {
-            metrics.highWatermark(topicAndPartition).set(latestOffset)
+            metrics.highWatermark.get(topicAndPartition).set(latestOffset)
           }
           if(metrics.lag.containsKey(topicAndPartition)) {
-            metrics.lag(topicAndPartition).set(latestOffset - offset)
+            metrics.lag.get(topicAndPartition).set(latestOffset - offset)
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
index 309b653..8c90c6c 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemAdmin.scala
@@ -21,20 +21,17 @@ package org.apache.samza.system.kafka
 
 import java.util
 import java.util.{Properties, UUID}
-
 import kafka.admin.AdminUtils
 import kafka.api._
-import kafka.common.{TopicAndPartition, TopicExistsException}
+import kafka.common.TopicAndPartition
 import kafka.consumer.{ConsumerConfig, SimpleConsumer}
 import kafka.utils.ZkUtils
-import org.apache.samza.config.KafkaConfig
+import org.apache.kafka.common.errors.TopicExistsException
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
 import org.apache.samza.system._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStrategy, KafkaUtil, Logging}
 import org.apache.samza.{Partition, SamzaException}
-
-import scala.collection.JavaConversions
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 
 object KafkaSystemAdmin extends Logging {
@@ -59,7 +56,7 @@ object KafkaSystemAdmin extends Logging {
               (systemStreamPartition.getPartition, partitionMetadata)
             })
             .toMap
-          val streamMetadata = new SystemStreamMetadata(streamName, 
streamPartitionMetadata)
+          val streamMetadata = new SystemStreamMetadata(streamName, 
streamPartitionMetadata.asJava)
           (streamName, streamMetadata)
       }
       .toMap
@@ -151,7 +148,7 @@ class KafkaSystemAdmin(
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
-          streams.toSet,
+          streams.asScala.toSet,
           systemName,
           getTopicMetadata,
           metadataTTL)
@@ -162,11 +159,11 @@ class KafkaSystemAdmin(
               pm =>
                 new Partition(pm.partitionId) -> new 
SystemStreamPartitionMetadata("", "", "")
             }.toMap[Partition, SystemStreamPartitionMetadata]
-            (topic -> new SystemStreamMetadata(topic, partitionsMap))
+            (topic -> new SystemStreamMetadata(topic, partitionsMap.asJava))
           }
         }
         loop.done
-        JavaConversions.mapAsJavaMap(result)
+        result.asJava
       },
 
       (exception, loop) => {
@@ -188,11 +185,11 @@ class KafkaSystemAdmin(
     // This is safe to do with Kafka, even if a topic is key-deduped. If the
     // offset doesn't exist on a compacted topic, Kafka will return the first
     // message AFTER the offset that was specified in the fetch request.
-    offsets.mapValues(offset => (offset.toLong + 1).toString)
+    offsets.asScala.mapValues(offset => (offset.toLong + 1).toString).asJava
   }
 
   override def getSystemStreamMetadata(streams: java.util.Set[String]) =
-    getSystemStreamMetadata(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500))
+    getSystemStreamMetadata(streams, new 
ExponentialSleepStrategy(initialDelayMs = 500)).asJava
 
   /**
    * Given a set of stream names (topics), fetch metadata from Kafka for each
@@ -207,7 +204,7 @@ class KafkaSystemAdmin(
     retryBackoff.run(
       loop => {
         val metadata = TopicMetadataCache.getTopicMetadata(
-          streams.toSet,
+          streams.asScala.toSet,
           systemName,
           getTopicMetadata,
           metadataTTL)
@@ -241,12 +238,7 @@ class KafkaSystemAdmin(
                   debug("Stripping newest offsets for %s because the topic 
appears empty." format topicAndPartition)
                   newestOffsets -= topicAndPartition
                   debug("Setting oldest offset to 0 to consume from beginning")
-                  oldestOffsets.get(topicAndPartition) match {
-                    case Some(s) =>
-                      oldestOffsets.updated(topicAndPartition, "0")
-                    case None =>
-                      oldestOffsets.put(topicAndPartition, "0")
-                  }
+                  oldestOffsets += (topicAndPartition -> "0")
                 }
             }
           } finally {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
index fa685ee..f25bb68 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemConsumer.scala
@@ -26,7 +26,6 @@ import kafka.message.MessageAndOffset
 import org.apache.samza.Partition
 import org.apache.kafka.common.utils.Utils
 import org.apache.samza.util.Clock
-import java.util.UUID
 import kafka.serializer.DefaultDecoder
 import kafka.serializer.Decoder
 import org.apache.samza.util.BlockingEnvelopeMap
@@ -37,7 +36,7 @@ import org.apache.samza.util.TopicMetadataStore
 import kafka.api.TopicMetadata
 import org.apache.samza.util.ExponentialSleepStrategy
 import java.util.concurrent.ConcurrentHashMap
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.system.SystemAdmin
 
 object KafkaSystemConsumer {
@@ -133,7 +132,7 @@ private[kafka] class KafkaSystemConsumer(
 
   type HostPort = (String, Int)
   val brokerProxies = scala.collection.mutable.Map[HostPort, BrokerProxy]()
-  val topicPartitionsAndOffsets: 
scala.collection.concurrent.Map[TopicAndPartition, String] = new 
ConcurrentHashMap[TopicAndPartition, String]()
+  val topicPartitionsAndOffsets: 
scala.collection.concurrent.Map[TopicAndPartition, String] = new 
ConcurrentHashMap[TopicAndPartition, String]().asScala
   var perPartitionFetchThreshold = fetchThreshold
   var perPartitionFetchThresholdBytes = 0L
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
index 6efd2dc..b680ed4 100644
--- 
a/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
+++ 
b/samza-kafka/src/main/scala/org/apache/samza/system/kafka/KafkaSystemProducer.scala
@@ -36,7 +36,7 @@ import org.apache.samza.util.KafkaUtil
 import org.apache.samza.util.Logging
 import org.apache.samza.util.TimerUtils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class KafkaSystemProducer(systemName: String,
                           retryBackoff: ExponentialSleepStrategy = new 
ExponentialSleepStrategy,
@@ -85,7 +85,7 @@ class KafkaSystemProducer(systemName: String,
         }
         currentProducer.close
 
-        sources.foreach {p =>
+        sources.asScala.foreach {p =>
           if (p._2.exceptionInCallback.get() == null) {
             flush(p._1)
           }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
----------------------------------------------------------------------
diff --git a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala 
b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
index 0f0bc22..41d380b 100644
--- a/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
+++ b/samza-kafka/src/main/scala/org/apache/samza/util/KafkaUtil.scala
@@ -23,13 +23,13 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicLong
 import kafka.admin.AdminUtils
 import kafka.utils.ZkUtils
-import org.apache.kafka.clients.producer.{Producer, ProducerRecord}
 import org.apache.kafka.common.PartitionInfo
 import org.apache.samza.config.Config
 import org.apache.samza.config.ConfigException
 import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.system.OutgoingMessageEnvelope
-import kafka.common.{TopicExistsException, ErrorMapping, 
ReplicaNotAvailableException}
+import kafka.common.{ErrorMapping, ReplicaNotAvailableException}
+import org.apache.kafka.common.errors.TopicExistsException
 import org.apache.samza.system.kafka.TopicMetadataCache
 
 object KafkaUtil extends Logging {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
index 1f2f62f..a14812e 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/checkpoint/kafka/TestKafkaCheckpointManager.scala
@@ -39,7 +39,7 @@ import org.apache.samza.{Partition, SamzaException}
 import org.junit.Assert._
 import org.junit._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection._
 
 class TestKafkaCheckpointManager extends KafkaServerTestHarness {
@@ -59,8 +59,8 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
 
   val partition = new Partition(0)
   val partition2 = new Partition(1)
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "123").asJava)
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", 
partition) -> "12345").asJava)
 
   var producerConfig: KafkaProducerConfig = null
 
@@ -82,7 +82,7 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     config.put("acks", "all")
     config.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "1")
     config.put(ProducerConfig.RETRIES_CONFIG, (new 
Integer(java.lang.Integer.MAX_VALUE-1)).toString)
-    config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES)
+    
config.putAll(KafkaCheckpointManagerFactory.INJECTED_PRODUCER_PROPERTIES.asJava)
     producerConfig = new KafkaProducerConfig("kafka", "i001", config)
 
     metadataStore = new ClientUtilTopicMetadataStore(brokers, "some-job-name")
@@ -234,7 +234,7 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     connectZk = () => ZkUtils(zkConnect, 6000, 6000, zkSecure),
     systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
-    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
+    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]().asJava)))
 
   // CheckpointManager with a specific checkpoint topic
   private def getKafkaCheckpointManager = 
getKafkaCheckpointManagerWithParam(checkpointTopic)
@@ -254,7 +254,7 @@ class TestKafkaCheckpointManager extends 
KafkaServerTestHarness {
     systemStreamPartitionGrouperFactoryString = 
systemStreamPartitionGrouperFactoryString,
     failOnCheckpointValidation = failOnTopicValidation,
     serde = new InvalideSerde(exception),
-    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]())))
+    checkpointTopicProperties = 
KafkaCheckpointManagerFactory.getCheckpointTopicProperties(new 
MapConfig(Map[String, String]().asJava)))
 
   class InvalideSerde(exception: String) extends CheckpointSerde {
     override def fromBytes(bytes: Array[Byte]): Checkpoint = {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
index d626f1c..555ab9f 100644
--- a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
+++ b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaConfig.scala
@@ -19,18 +19,14 @@
 
 package org.apache.samza.config
 
-import java.net.URI
-import java.io.File
 import java.util.Properties
-import kafka.consumer.ConsumerConfig
 import org.apache.samza.config.factories.PropertiesConfigFactory
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.kafka.common.serialization.ByteArraySerializer
 import org.apache.kafka.clients.producer.ProducerConfig
 import org.junit.Before
-import org.junit.BeforeClass
 
 class TestKafkaConfig {
   
@@ -52,7 +48,7 @@ class TestKafkaConfig {
     val factory = new PropertiesConfigFactory()
     props.setProperty("systems." + SYSTEM_NAME + ".samza.factory", 
"org.apache.samza.system.kafka.KafkaSystemFactory")
 
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
 
     val consumerConfig1 = 
kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, "TestClientId1")
@@ -85,33 +81,33 @@ class TestKafkaConfig {
 
   @Test
   def testStreamLevelFetchSizeOverride() {
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val consumerConfig = kafkaConfig.getKafkaSystemConsumerConfig(SYSTEM_NAME, 
TEST_CLIENT_ID)
     // default fetch size
     assertEquals(1024*1024, consumerConfig.fetchMessageMaxBytes)
 
     props.setProperty("systems." + SYSTEM_NAME + 
".consumer.fetch.message.max.bytes", "262144")
-    val mapConfig1 = new MapConfig(props.toMap[String, String])
+    val mapConfig1 = new MapConfig(props.asScala.asJava)
     val kafkaConfig1 = new KafkaConfig(mapConfig1)
     val consumerConfig1 = 
kafkaConfig1.getKafkaSystemConsumerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     // shared fetch size
     assertEquals(512*512, consumerConfig1.fetchMessageMaxBytes)
     
     props.setProperty("systems." + SYSTEM_NAME + 
".streams.topic1.consumer.fetch.message.max.bytes", "65536")
-    val mapConfig2 = new MapConfig(props.toMap[String, String])
+    val mapConfig2 = new MapConfig(props.asScala.asJava)
     val kafkaConfig2 = new KafkaConfig(mapConfig2)
     val consumerConfig2 = 
kafkaConfig2.getFetchMessageMaxBytesTopics(SYSTEM_NAME)
     // topic fetch size
     assertEquals(256*256, consumerConfig2 getOrElse ("topic1", 1024*1024))
 
     // default samza.fetch.threshold.bytes
-    val mapConfig3 = new MapConfig(props.toMap[String, String])
+    val mapConfig3 = new MapConfig(props.asScala.asJava)
     val kafkaConfig3 = new KafkaConfig(mapConfig3)
     assertTrue(kafkaConfig3.getConsumerFetchThresholdBytes("kafka").isEmpty)
 
     props.setProperty("systems.kafka.samza.fetch.threshold.bytes", "65536")
-    val mapConfig4 = new MapConfig(props.toMap[String, String])
+    val mapConfig4 = new MapConfig(props.asScala.asJava)
     val kafkaConfig4 = new KafkaConfig(mapConfig4)
     assertEquals("65536", 
kafkaConfig4.getConsumerFetchThresholdBytes("kafka").get)
   }
@@ -125,7 +121,7 @@ class TestKafkaConfig {
     props.setProperty("stores.test3.changelog", "otherstream")
     props.setProperty("stores.test1.changelog.kafka.cleanup.policy", "delete")
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     
assertEquals(kafkaConfig.getChangelogKafkaProperties("test1").getProperty("cleanup.policy"),
 "delete")
     
assertEquals(kafkaConfig.getChangelogKafkaProperties("test2").getProperty("cleanup.policy"),
 "compact")
@@ -138,7 +134,7 @@ class TestKafkaConfig {
   
   @Test
   def testDefaultValuesForProducerProperties() {
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -155,7 +151,7 @@ class TestKafkaConfig {
     
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, expectedValue);
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -169,7 +165,7 @@ class TestKafkaConfig {
     
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + 
ProducerConfig.RETRIES_CONFIG, expectedValue);
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     val producerProperties = kafkaProducerConfig.getProducerProperties
@@ -181,7 +177,7 @@ class TestKafkaConfig {
   def testMaxInFlightRequestsPerConnectionWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + 
ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, "Samza");
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties
@@ -191,7 +187,7 @@ class TestKafkaConfig {
   def testRetriesWrongNumberFormat() {
     props.setProperty(KAFKA_PRODUCER_PROPERTY_PREFIX + 
ProducerConfig.RETRIES_CONFIG, "Samza");
     
-    val mapConfig = new MapConfig(props.toMap[String, String])
+    val mapConfig = new MapConfig(props.asScala.asJava)
     val kafkaConfig = new KafkaConfig(mapConfig)
     val kafkaProducerConfig = 
kafkaConfig.getKafkaSystemProducerConfig(SYSTEM_NAME, TEST_CLIENT_ID)
     kafkaProducerConfig.getProducerProperties

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
index d6899b8..3871560 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestKafkaSerdeConfig.scala
@@ -23,7 +23,7 @@ import 
org.apache.samza.config.KafkaSerdeConfig.Config2KafkaSerde
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestKafkaSerdeConfig {
   val MAGIC_VAL = "1000"
@@ -31,7 +31,7 @@ class TestKafkaSerdeConfig {
   val paramsToTest = List(
     "serializers.registry.test.encoder", "serializers.registry.test.decoder")
 
-  val config = new MapConfig(mapAsJavaMap(paramsToTest.map { m => (m, 
MAGIC_VAL) }.toMap))
+  val config = new MapConfig(paramsToTest.map { m => (m, MAGIC_VAL) 
}.toMap.asJava)
 
   @Test
   def testKafkaConfigurationIsBackwardsCompatible {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
index 89ced34..69d7da6 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/config/TestRegExTopicGenerator.scala
@@ -19,9 +19,8 @@
 
 package org.apache.samza.config
 
-import collection.JavaConversions._
+import collection.JavaConverters._
 
-import org.apache.samza.SamzaException
 import org.junit.Assert._
 import org.junit.Test
 
@@ -45,7 +44,7 @@ class TestRegExTopicGenerator {
       getRegexConfigInherited + ".b.triumph" -> "spitfire",
       unrelated)
 
-    val config = new MapConfig(map)
+    val config = new MapConfig(map.asJava)
 
     // Don't actually talk to ZooKeeper
     val rewriter = new RegExTopicGenerator() {
@@ -83,7 +82,7 @@ class TestRegExTopicGenerator {
       override def getTopicsFromZK(rewriterName: String, config: Config): 
Seq[String] = List("yoyoyo")
     }
 
-    val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map))
+    val config = rewriter.rewrite(REWRITER_NAME, new MapConfig(map.asJava))
     assertEquals("test.yoyoyo", config.get(TaskConfig.INPUT_STREAMS))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
index cc7077c..f0bdafd 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestBrokerProxy.scala
@@ -37,7 +37,7 @@ import org.mockito.invocation.InvocationOnMock
 import org.mockito.stubbing.Answer
 import org.mockito.{Matchers, Mockito}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestBrokerProxy extends Logging {
   val tp2 = new TopicAndPartition("Redbird", 2013)
@@ -52,8 +52,8 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(2, sink.receivedMessages.size)
-    assertEquals(42, sink.receivedMessages.get(0)._2.offset)
-    assertEquals(84, sink.receivedMessages.get(1)._2.offset)
+    assertEquals(42, sink.receivedMessages(0)._2.offset)
+    assertEquals(84, sink.receivedMessages(1)._2.offset)
   }
 
   @Test def brokerProxySkipsFetchForEmptyRequests() = {
@@ -64,8 +64,8 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp2, Option("0"))
     Thread.sleep(1000)
     assertEquals(0, sink.receivedMessages.size)
-    assertTrue(bp.metrics.brokerSkippedFetchRequests(bp.host, 
bp.port).getCount > 0)
-    assertEquals(0, bp.metrics.brokerReads(bp.host, bp.port).getCount)
+    assertTrue(bp.metrics.brokerSkippedFetchRequests.get((bp.host, 
bp.port)).getCount > 0)
+    assertEquals(0, bp.metrics.brokerReads.get((bp.host, bp.port)).getCount)
   }
 
   @Test def brokerProxyThrowsExceptionOnDuplicateTopicPartitions() = {
@@ -91,7 +91,7 @@ class TestBrokerProxy extends Logging {
       def refreshDropped() {}
 
       def addMessage(tp: TopicAndPartition, msg: MessageAndOffset, 
highWatermark: Long) {
-        receivedMessages.add((tp, msg, msg.offset.equals(highWatermark)))
+        receivedMessages += ((tp, msg, msg.offset.equals(highWatermark)))
       }
 
       def setIsAtHighWatermark(tp: TopicAndPartition, isAtHighWatermark: 
Boolean) {
@@ -109,7 +109,7 @@ class TestBrokerProxy extends Logging {
 
     metrics.registerBrokerProxy(host, port)
     metrics.registerTopicAndPartition(tp)
-    metrics.topicPartitions(host, port).set(1)
+    metrics.topicPartitions.get((host, port)).set(1)
 
     val bp = new BrokerProxy(
       host,
@@ -168,7 +168,7 @@ class TestBrokerProxy extends Logging {
             val fetchResponsePartitionData = FetchResponsePartitionData(0, 
500, messageSet)
             val map = scala.Predef.Map[TopicAndPartition, 
FetchResponsePartitionData](tp -> fetchResponsePartitionData)
 
-            when(fetchResponse.data).thenReturn(map)
+            when(fetchResponse.data).thenReturn(map.toSeq)
             when(fetchResponse.messageSet(any(classOf[String]), 
any(classOf[Int]))).thenReturn(messageSet)
             fetchResponse
           }
@@ -210,14 +210,14 @@ class TestBrokerProxy extends Logging {
     bp.addTopicPartition(tp, Option("0"))
     Thread.sleep(1000)
     // update when fetching messages
-    assertEquals(500, bp.metrics.highWatermark(tp).getValue)
-    assertEquals(415, bp.metrics.lag(tp).getValue)
+    assertEquals(500, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(415, bp.metrics.lag.get(tp).getValue)
 
     fetchTp1 = false
     Thread.sleep(1000)
     // update when not fetching messages
-    assertEquals(100, bp.metrics.highWatermark(tp).getValue)
-    assertEquals(15, bp.metrics.lag(tp).getValue)
+    assertEquals(100, bp.metrics.highWatermark.get(tp).getValue)
+    assertEquals(15, bp.metrics.lag.get(tp).getValue)
 
     fetchTp1 = true
   }
@@ -264,7 +264,7 @@ class TestBrokerProxy extends Logging {
           val response = mock(classOf[FetchResponsePartitionData])
           when(response.error).thenReturn(ErrorMapping.OffsetOutOfRangeCode)
           val responseMap = Map(tp -> response)
-          when(mfr.data).thenReturn(responseMap)
+          when(mfr.data).thenReturn(responseMap.toSeq)
           invocationCount += 1
           mfr
         } else {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
index be7db97..19f3903 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemAdmin.scala
@@ -39,7 +39,7 @@ import org.apache.samza.util.{ClientUtilTopicMetadataStore, 
ExponentialSleepStra
 import org.junit.Assert._
 import org.junit._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
   * README: New tests should be added to the Java tests. See 
TestKafkaSystemAdminJava
@@ -203,11 +203,11 @@ class TestKafkaSystemAdmin {
     validateTopic(TOPIC, 50)
 
     // Verify the empty topic behaves as expected.
-    var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    var metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
-    assertNotNull(metadata(TOPIC))
+    assertNotNull(metadata.get(TOPIC))
     // Verify partition count.
-    var sspMetadata = metadata(TOPIC).getSystemStreamPartitionMetadata
+    var sspMetadata = metadata.get(TOPIC).getSystemStreamPartitionMetadata
     assertEquals(50, sspMetadata.size)
     // Empty topics should have null for latest offset and 0 for earliest 
offset
     assertEquals("0", sspMetadata.get(new Partition(0)).getOldestOffset)
@@ -218,11 +218,11 @@ class TestKafkaSystemAdmin {
     // Add a new message to one of the partitions, and verify that it works as
     // expected.
     producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, 
"val1".getBytes)).get()
-    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
-    val streamName = metadata.keySet.head
+    val streamName = metadata.keySet.asScala.head
     assertEquals(TOPIC, streamName)
-    sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
     // key1 gets hash-mod'd to partition 48.
     assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
     assertEquals("0", sspMetadata.get(new Partition(48)).getNewestOffset)
@@ -234,10 +234,10 @@ class TestKafkaSystemAdmin {
 
     // Add a second message to one of the same partition.
     producer.send(new ProducerRecord(TOPIC, 48, "key1".getBytes, 
"val2".getBytes)).get()
-    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC))
+    metadata = systemAdmin.getSystemStreamMetadata(Set(TOPIC).asJava)
     assertEquals(1, metadata.size)
     assertEquals(TOPIC, streamName)
-    sspMetadata = metadata(streamName).getSystemStreamPartitionMetadata
+    sspMetadata = metadata.get(streamName).getSystemStreamPartitionMetadata
     // key1 gets hash-mod'd to partition 48.
     assertEquals("0", sspMetadata.get(new Partition(48)).getOldestOffset)
     assertEquals("1", sspMetadata.get(new Partition(48)).getNewestOffset)
@@ -245,7 +245,7 @@ class TestKafkaSystemAdmin {
 
     // Validate that a fetch will return the message.
     val connector = getConsumerConnector
-    var stream = connector.createMessageStreams(Map(TOPIC -> 
1)).get(TOPIC).get.get(0).iterator
+    var stream = connector.createMessageStreams(Map(TOPIC -> 
1))(TOPIC).head.iterator
     var message = stream.next
     var text = new String(message.message, "UTF-8")
     connector.shutdown
@@ -261,10 +261,10 @@ class TestKafkaSystemAdmin {
 
   @Test
   def testNonExistentTopic {
-    val initialOffsets = 
systemAdmin.getSystemStreamMetadata(Set("non-existent-topic"))
-    val metadata = initialOffsets.getOrElse("non-existent-topic", 
fail("missing metadata"))
+    val initialOffsets = 
systemAdmin.getSystemStreamMetadata(Set("non-existent-topic").asJava)
+    val metadata = initialOffsets.asScala.getOrElse("non-existent-topic", 
fail("missing metadata"))
     assertEquals(metadata, new SystemStreamMetadata("non-existent-topic", Map(
-      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, "0"))))
+      new Partition(0) -> new SystemStreamPartitionMetadata("0", null, 
"0")).asJava))
   }
 
   @Test
@@ -273,9 +273,9 @@ class TestKafkaSystemAdmin {
     val ssp2 = new SystemStreamPartition("test-system", "test-stream", new 
Partition(1))
     val offsetsAfter = systemAdmin.getOffsetsAfter(Map(
       ssp1 -> "1",
-      ssp2 -> "2"))
-    assertEquals("2", offsetsAfter(ssp1))
-    assertEquals("3", offsetsAfter(ssp2))
+      ssp2 -> "2").asJava)
+    assertEquals("2", offsetsAfter.get(ssp1))
+    assertEquals("3", offsetsAfter.get(ssp2))
   }
 
   @Test
@@ -310,7 +310,7 @@ class TestKafkaSystemAdmin {
     val systemAdmin = new KafkaSystemAdminWithTopicMetadataError
     val retryBackoff = new ExponentialSleepStrategy.Mock(maxCalls = 3)
     try {
-      systemAdmin.getSystemStreamMetadata(Set("quux"), retryBackoff)
+      systemAdmin.getSystemStreamMetadata(Set("quux").asJava, retryBackoff)
       fail("expected CallLimitReached to be thrown")
     } catch {
       case e: ExponentialSleepStrategy.CallLimitReached => ()

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
index ce84b6d..c333935 100644
--- 
a/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
+++ 
b/samza-kafka/src/test/scala/org/apache/samza/system/kafka/TestKafkaSystemFactory.scala
@@ -23,11 +23,10 @@ import org.apache.samza.SamzaException
 import org.apache.samza.config.MapConfig
 import org.apache.samza.config.StorageConfig
 import org.apache.samza.metrics.MetricsRegistryMap
-import org.apache.samza.system.SystemStream
 import org.junit.Assert._
 import org.junit.Test
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class TestKafkaSystemFactory {
   @Test
@@ -36,7 +35,7 @@ class TestKafkaSystemFactory {
     try {
       producerFactory.getProducer(
         "test",
-        new MapConfig(Map[String, String]()),
+        new MapConfig(Map[String, String]().asJava),
         new MetricsRegistryMap)
       fail("Expected to get a Samza exception.")
     } catch {
@@ -49,7 +48,7 @@ class TestKafkaSystemFactory {
   def testFailWhenSerdeIsInvalid {
     val producerFactory = new KafkaSystemFactory
     val config = new MapConfig(Map[String, String](
-      "streams.test.serde" -> "failme"))
+      "streams.test.serde" -> "failme").asJava)
     try {
       producerFactory.getProducer(
         "test",
@@ -70,7 +69,7 @@ class TestKafkaSystemFactory {
       "systems.test.producer.bootstrap.servers" -> "",
       "systems.test.samza.key.serde" -> "json",
       "systems.test.samza.msg.serde" -> "json",
-      "serializers.registry.json.class" -> 
"samza.serializers.JsonSerdeFactory"))
+      "serializers.registry.json.class" -> 
"samza.serializers.JsonSerdeFactory").asJava)
     var producer = producerFactory.getProducer(
       "test",
       config,
@@ -91,7 +90,7 @@ class TestKafkaSystemFactory {
       StorageConfig.FACTORY.format("system1") -> "some.factory.Class",
       StorageConfig.CHANGELOG_STREAM.format("system1") -> "system1.stream1",
       StorageConfig.FACTORY.format("system2") -> "some.factory.Class")
-    val config = new MapConfig(configMap)
+    val config = new MapConfig(configMap.asJava)
     assertEquals(Map[String, String](), 
KafkaSystemFactory.getInjectedProducerProperties("system3", config))
     assertEquals(Map[String, String](), 
KafkaSystemFactory.getInjectedProducerProperties("system2", config))
     assertEquals(Map[String, String]("compression.type" -> "none"), 
KafkaSystemFactory.getInjectedProducerProperties("system1", config))

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
index 5112ac6..c771788 100644
--- 
a/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
+++ 
b/samza-kv-rocksdb/src/main/scala/org/apache/samza/storage/kv/RocksDbKeyValueStore.scala
@@ -205,14 +205,15 @@ class RocksDbKeyValueStore(
   class RocksDbIterator(iter: RocksIterator) extends 
KeyValueIterator[Array[Byte], Array[Byte]] {
     private var open = true
     private var firstValueAccessed = false
-    def close() = {
+
+    override def close() = {
       open = false
       iter.close()
     }
 
-    def remove() = throw new UnsupportedOperationException("RocksDB iterator 
doesn't support remove")
+    override def remove() = throw new UnsupportedOperationException("RocksDB 
iterator doesn't support remove")
 
-    def hasNext() = iter.isValid
+    override def hasNext() = iter.isValid
 
     // The iterator is already pointing to the next element
     protected def peekKey() = {
@@ -231,7 +232,7 @@ class RocksDbKeyValueStore(
     // current element we are pointing to and advance the iterator to the next 
     // location (The new location may or may not be valid - this will surface
     // when the next next() call is made, the isValid will fail)
-    def next() = {
+    override def next() = {
       if (!hasNext()) {
         throw new NoSuchElementException
       }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
index 4141cbf..2aac6aa 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/KeyValueStorageEngine.scala
@@ -24,7 +24,7 @@ import org.apache.samza.storage.{StoreProperties, 
StorageEngine}
 import org.apache.samza.system.IncomingMessageEnvelope
 import org.apache.samza.util.TimerUtils
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 /**
  * A key value store.
@@ -101,7 +101,7 @@ class KeyValueStorageEngine[K, V](
   def restore(envelopes: java.util.Iterator[IncomingMessageEnvelope]) {
     val batch = new java.util.ArrayList[Entry[Array[Byte], 
Array[Byte]]](batchSize)
 
-    for (envelope <- envelopes) {
+    for (envelope <- envelopes.asScala) {
       val keyBytes = envelope.getKey.asInstanceOf[Array[Byte]]
       val valBytes = envelope.getMessage.asInstanceOf[Array[Byte]]
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
index 3de257c..9e67fc8 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/NullSafeKeyValueStore.scala
@@ -21,7 +21,7 @@ package org.apache.samza.storage.kv
 
 import org.apache.samza.util.Util.notNull
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object NullSafeKeyValueStore {
   val NullKeyErrorMessage = "Null is not a valid key."
@@ -39,7 +39,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) 
extends KeyValueSt
 
   def getAll(keys: java.util.List[K]): java.util.Map[K, V] = {
     notNull(keys, NullKeysErrorMessage)
-    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage))
     store.getAll(keys)
   }
 
@@ -50,7 +50,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) 
extends KeyValueSt
   }
 
   def putAll(entries: java.util.List[Entry[K, V]]) {
-    entries.foreach(entry => {
+    entries.asScala.foreach(entry => {
       notNull(entry.getKey, NullKeyErrorMessage)
       notNull(entry.getValue, NullValueErrorMessage)
     })
@@ -64,7 +64,7 @@ class NullSafeKeyValueStore[K, V](store: KeyValueStore[K, V]) 
extends KeyValueSt
 
   def deleteAll(keys: java.util.List[K]) = {
     notNull(keys, NullKeysErrorMessage)
-    keys.foreach(key => notNull(key, NullKeyErrorMessage))
+    keys.asScala.foreach(key => notNull(key, NullKeyErrorMessage))
     store.deleteAll(keys)
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
index d77d476..c8939b7 100644
--- 
a/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
+++ 
b/samza-kv/src/main/scala/org/apache/samza/storage/kv/SerializedKeyValueStore.scala
@@ -98,10 +98,10 @@ class SerializedKeyValueStore[K, V](
   }
 
   private class DeserializingIterator(iter: KeyValueIterator[Array[Byte], 
Array[Byte]]) extends KeyValueIterator[K, V] {
-    def hasNext() = iter.hasNext()
-    def remove() = iter.remove()
-    def close() = iter.close()
-    def next(): Entry[K, V] = {
+    override def hasNext() = iter.hasNext()
+    override def remove() = iter.remove()
+    override def close() = iter.close()
+    override def next(): Entry[K, V] = {
       val nxt = iter.next()
       val key = fromBytesOrNull(nxt.getKey, keySerde)
       val value = fromBytesOrNull(nxt.getValue, msgSerde)

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
----------------------------------------------------------------------
diff --git 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
index 595dd0d..f57b275 100644
--- 
a/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
+++ 
b/samza-kv/src/test/scala/org/apache/samza/storage/kv/MockKeyValueStore.scala
@@ -19,7 +19,7 @@
 
 package org.apache.samza.storage.kv
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import java.util
 
 /**
@@ -36,7 +36,7 @@ class MockKeyValueStore extends KeyValueStore[String, String] 
{
   }
 
   override def putAll(entries: java.util.List[Entry[String, String]]) {
-    for (entry <- entries) {
+    for (entry <- entries.asScala) {
       kvMap.put(entry.getKey, entry.getValue)
     }
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
index 1ce7d25..5d1b497 100644
--- 
a/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
+++ 
b/samza-test/src/main/scala/org/apache/samza/test/performance/TestKeyValuePerformance.scala
@@ -37,7 +37,7 @@ import org.apache.samza.task.TaskInstanceCollector
 import org.apache.samza.util.{CommandLine, Logging, Util}
 import org.apache.samza.{Partition, SamzaException}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.util.Random
 
 /**
@@ -258,7 +258,7 @@ class TestKeyValuePerformance extends Logging {
         store.flush()
 
         timer.reset().start()
-        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
         val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
 
         // Restore cache, in case it's enabled, to a state similar to the one 
above when the getAll test started
@@ -312,9 +312,9 @@ class TestKeyValuePerformance extends Logging {
         val shuffledKeys = Random.shuffle(keys).take(messagesCountPerBatch)
 
         // We want to measure ::getAll when called many times, so populate the 
cache because first call is a cache-miss
-        val totalSize = store.getAll(shuffledKeys).values.map(_.length).sum
+        val totalSize = 
store.getAll(shuffledKeys.asJava).values.asScala.map(_.length).sum
         timer.reset().start()
-        assert(store.getAll(shuffledKeys).size == shuffledKeys.size)
+        assert(store.getAll(shuffledKeys.asJava).size == shuffledKeys.size)
         val getAllTime = timer.stop().elapsed(TimeUnit.MILLISECONDS)
 
         // We want to measure ::get when called many times, so populate the 
cache because first call is a cache-miss

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
index d7d23ec..babd15c 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/storage/kv/TestKeyValueStores.scala
@@ -36,7 +36,7 @@ import org.junit.Before
 import org.junit.Test
 import org.scalatest.Assertions.intercept
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.ArrayBuffer
 
 /**
@@ -114,7 +114,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   def testGetAllWhenZeroMatch() {
     store.put(b("hello"), b("world"))
     val keys = List(b("foo"), b("bar"))
-    val actual = store.getAll(keys)
+    val actual = store.getAll(keys.asJava)
     keys.foreach(k => assertNull("Key: " + k, actual.get(k)))
   }
 
@@ -122,18 +122,18 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
   def testGetAllWhenFullMatch() {
     val expected = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
     expected.foreach(e => store.put(e._1, e._2))
-    val actual = store.getAll(expected.keys.toList)
+    val actual = store.getAll(expected.keys.toList.asJava)
     assertEquals("Size", expected.size, actual.size)
     expected.foreach(e => assertArrayEquals("Value at: " + s(e._1), e._2, 
actual.get(e._1)))
   }
 
   @Test
   def testGetAllWhenPartialMatch() {
-    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> b("v2"))
-    val found = all.entrySet.head
-    val notFound = all.entrySet.last
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"), b("k2") -> 
b("v2")).asJava
+    val found = all.entrySet.asScala.head
+    val notFound = all.entrySet.asScala.last
     store.put(found.getKey, found.getValue)
-    val actual = store.getAll(List(notFound.getKey, found.getKey))
+    val actual = store.getAll(List(notFound.getKey, found.getKey).asJava)
     assertNull(actual.get(notFound.getKey))
     assertArrayEquals(found.getValue, actual.get(found.getKey))
   }
@@ -160,14 +160,14 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
 
       intercept[NullPointerException] { store.get(null) }
       intercept[NullPointerException] { store.getAll(null) }
-      intercept[NullPointerException] { store.getAll(List(a, null)) }
+      intercept[NullPointerException] { store.getAll(List(a, null).asJava) }
       intercept[NullPointerException] { store.delete(null) }
       intercept[NullPointerException] { store.deleteAll(null) }
-      intercept[NullPointerException] { store.deleteAll(List(a, null)) }
+      intercept[NullPointerException] { store.deleteAll(List(a, null).asJava) }
       intercept[NullPointerException] { store.put(null, a) }
       intercept[NullPointerException] { store.put(a, null) }
-      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new 
Entry[Array[Byte], Array[Byte]](a, null))) }
-      intercept[NullPointerException] { store.putAll(List(new 
Entry[Array[Byte], Array[Byte]](null, a))) }
+      intercept[NullPointerException] { store.putAll(List(new Entry(a, a), new 
Entry[Array[Byte], Array[Byte]](a, null)).asJava) }
+      intercept[NullPointerException] { store.putAll(List(new 
Entry[Array[Byte], Array[Byte]](null, a)).asJava) }
       intercept[NullPointerException] { store.range(a, null) }
       intercept[NullPointerException] { store.range(null, a) }
     }
@@ -182,7 +182,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
     // from the cache's underlying store (rocksdb), but that == would fail.
     val numEntries = CacheSize - 1
     val entries = (0 until numEntries).map(i => new Entry(b("k" + i), b("v" + 
i)))
-    store.putAll(entries)
+    store.putAll(entries.asJava)
     if (cache) {
       assertTrue("All values should be found and cached.", entries.forall(e => 
store.get(e.getKey) == e.getValue))
     } else {
@@ -225,7 +225,7 @@ class TestKeyValueStores(typeOfStore: String, storeConfig: 
String) {
   def testDeleteAllWhenZeroMatch() {
     val foo = b("foo")
     store.put(foo, foo)
-    store.deleteAll(List(b("bar")))
+    store.deleteAll(List(b("bar")).asJava)
     assertArrayEquals(foo, store.get(foo))
   }
 
@@ -233,23 +233,23 @@ class TestKeyValueStores(typeOfStore: String, 
storeConfig: String) {
   def testDeleteAllWhenFullMatch() {
     val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
     all.foreach(e => store.put(e._1, e._2))
-    assertEquals(all.size, store.getAll(all.keys.toList).size)
-    store.deleteAll(all.keys.toList)
+    assertEquals(all.size, store.getAll(all.keys.toList.asJava).size)
+    store.deleteAll(all.keys.toList.asJava)
     all.keys.foreach(key => assertNull("Value at: " + s(key), store.get(key)))
   }
 
   @Test
   def testDeleteAllWhenPartialMatch() {
-    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1"))
-    val found = all.entrySet.head
-    val leftAlone = all.entrySet.last
-    all.foreach(e => store.put(e._1, e._2))
+    val all = Map(b("k0") -> b("v0"), b("k1") -> b("v1")).asJava
+    val found = all.entrySet.asScala.head
+    val leftAlone = all.entrySet.asScala.last
+    all.asScala.foreach(e => store.put(e._1, e._2))
     assertArrayEquals(found.getValue, store.get(found.getKey))
-    store.deleteAll(List(b("not found"), found.getKey))
+    store.deleteAll(List(b("not found"), found.getKey).asJava)
     store.flush()
     val allIterator = store.all
     try {
-      assertEquals(1, allIterator.size)
+      assertEquals(1, allIterator.asScala.size)
       assertArrayEquals(leftAlone.getValue, store.get(leftAlone.getKey))
     } finally {
       allIterator.close()

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
index b803dfe..7a107f6 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/StreamTaskTestUtil.scala
@@ -45,7 +45,7 @@ import org.apache.samza.task._
 import org.apache.samza.util.{ClientUtilTopicMetadataStore, KafkaUtil, 
TopicMetadataStore}
 import org.junit.Assert._
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.mutable.{ArrayBuffer, Buffer, HashMap, SynchronizedMap}
 
 /*
@@ -70,8 +70,8 @@ object StreamTaskTestUtil {
   def zkConnect: String = s"127.0.0.1:$zkPort"
 
   var producer: Producer[Array[Byte], Array[Byte]] = null
-  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new 
Partition(0)) -> "123"))
-  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new 
Partition(0)) -> "12345"))
+  val cp1 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new 
Partition(0)) -> "123").asJava)
+  val cp2 = new Checkpoint(Map(new SystemStreamPartition("kafka", "topic", new 
Partition(0)) -> "12345").asJava)
 
   var metadataStore: TopicMetadataStore = null
 
@@ -203,7 +203,7 @@ class StreamTaskTestUtil {
    */
   def startJob = {
     // Start task.
-    val job = new JobRunner(new MapConfig(jobConfig)).run()
+    val job = new JobRunner(new MapConfig(jobConfig.asJava)).run()
     assertEquals(ApplicationStatus.Running, 
job.waitForStatus(ApplicationStatus.Running, 60000))
     TestTask.awaitTaskRegistered
     val tasks = TestTask.tasks
@@ -246,7 +246,7 @@ class StreamTaskTestUtil {
 
     val consumerConfig = new ConsumerConfig(props)
     val consumerConnector = Consumer.create(consumerConfig)
-    var stream = consumerConnector.createMessageStreams(Map(topic -> 
1)).get(topic).get.get(0).iterator
+    val stream = consumerConnector.createMessageStreams(Map(topic -> 
1))(topic).head.iterator
     var message: MessageAndMetadata[Array[Byte], Array[Byte]] = null
     var messages = ArrayBuffer[String]()
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
index 06a107b..c10e7fb 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestShutdownStatefulTask.scala
@@ -26,7 +26,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, 
TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object TestShutdownStatefulTask {
   val STORE_NAME = "loggedstore"
@@ -119,7 +119,7 @@ class ShutdownStateStoreTask extends TestTask {
       .getStore(TestShutdownStatefulTask.STORE_NAME)
       .asInstanceOf[KeyValueStore[String, String]]
     val iter = store.all
-    iter.foreach( p => restored += (p.getKey -> p.getValue))
+    iter.asScala.foreach( p => restored += (p.getKey -> p.getValue))
     System.err.println("ShutdownStateStoreTask.init(): %s" format restored)
     iter.close
   }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
----------------------------------------------------------------------
diff --git 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
index 2240903..e5b6756 100644
--- 
a/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
+++ 
b/samza-test/src/test/scala/org/apache/samza/test/integration/TestStatefulTask.scala
@@ -27,7 +27,7 @@ import org.apache.samza.task.{MessageCollector, TaskContext, 
TaskCoordinator}
 import org.junit.Assert._
 import org.junit.{AfterClass, BeforeClass, Test}
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 object TestStatefulTask {
     val STORE_NAME = "mystore"
@@ -171,6 +171,7 @@ class StateStoreTestTask extends TestTask {
     store = 
context.getStore(TestStatefulTask.STORE_NAME).asInstanceOf[KeyValueStore[String,
 String]]
     val iter = store.all
     restored ++= iter
+      .asScala
       .map(_.getValue)
       .toSet
     System.err.println("StateStoreTestTask.init(): %s" format restored)

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
index e5aafbb..c7b1b6d 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/ClientHelper.scala
@@ -24,7 +24,7 @@ import org.apache.samza.config.{Config, JobConfig, YarnConfig}
 import org.apache.samza.coordinator.stream.CoordinatorStreamWriter
 import org.apache.samza.coordinator.stream.messages.SetConfig
 
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.Map
 import scala.collection.mutable.HashMap
 import org.apache.hadoop.conf.Configuration
@@ -160,7 +160,7 @@ class ClientHelper(conf: Configuration) extends Logging {
     resource.setVirtualCores(cpu)
     info("set cpu core request to %s for %s" format (cpu, appId.get))
     appCtx.setResource(resource)
-    containerCtx.setCommands(cmds.toList)
+    containerCtx.setCommands(cmds.asJava)
     info("set command to %s for %s" format (cmds, appId.get))
 
     appCtx.setApplicationId(appId.get)
@@ -173,7 +173,7 @@ class ClientHelper(conf: Configuration) extends Logging {
     // include the resources from the universal resource configurations
     try {
       val resourceMapper = new LocalizerResourceMapper(new 
LocalizerResourceConfig(config), new YarnConfiguration(conf))
-      localResources ++= resourceMapper.getResourceMap
+      localResources ++= resourceMapper.getResourceMap.asScala
     } catch {
       case e: LocalizerResourceException => {
         throw new SamzaException("Exception during resource mapping from 
config. ", e)
@@ -202,12 +202,12 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     // prepare all local resources for localizer
     info("localResources is: %s" format localResources)
-    containerCtx.setLocalResources(localResources)
+    containerCtx.setLocalResources(localResources.asJava)
     info("set local resources on application master for %s" format appId.get)
 
     env match {
       case Some(env) => {
-        containerCtx.setEnvironment(env)
+        containerCtx.setEnvironment(env.asJava)
         info("set environment variables to %s for %s" format (env, appId.get))
       }
       case None =>
@@ -232,8 +232,8 @@ class ClientHelper(conf: Configuration) extends Logging {
   def getApplicationMaster(appId: ApplicationId): Option[ApplicationReport] = {
     yarnClient
       .getApplications
-      .filter(appRep => appId.equals(appRep.getApplicationId()))
-      .headOption
+      .asScala
+      .find(appRep => appId.equals(appRep.getApplicationId))
   }
 
   def getApplicationMasters(status: Option[ApplicationStatus]): 
List[ApplicationReport] = {
@@ -241,9 +241,10 @@ class ClientHelper(conf: Configuration) extends Logging {
 
     status match {
       case Some(status) => getAppsRsp
+        .asScala
         .filter(appRep => 
status.equals(convertState(appRep.getYarnApplicationState, 
appRep.getFinalApplicationStatus).get))
         .toList
-      case None => getAppsRsp.toList
+      case None => getAppsRsp.asScala.toList
     }
   }
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala 
b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
index f057594..2d8a3f1 100644
--- a/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
+++ b/samza-yarn/src/main/scala/org/apache/samza/job/yarn/YarnJobFactory.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.samza.config.Config
 import org.apache.samza.util.hadoop.HttpFileSystem
 import org.apache.samza.util.Logging
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 
 class YarnJobFactory extends StreamJobFactory with Logging {
   def getJob(config: Config) = {
@@ -42,7 +42,7 @@ class YarnJobFactory extends StreamJobFactory with Logging {
 
     // Use the Samza job config "fs.<scheme>.impl" to override 
YarnConfiguration
     val fsImplConfig = new FileSystemImplConfig(config)
-    fsImplConfig.getSchemes.foreach(
+    fsImplConfig.getSchemes.asScala.foreach(
       (scheme : String) => hConfig.set(fsImplConfig.getFsImplKey(scheme), 
fsImplConfig.getFsImplClassName(scheme))
     )
 

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
index cdd389c..122a1df 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterRestServlet.scala
@@ -25,12 +25,12 @@ import scalate.ScalateSupport
 import org.apache.samza.config.Config
 import org.apache.samza.job.yarn.{YarnAppState, ClientHelper}
 import org.apache.samza.metrics._
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import java.util.HashMap
 import org.apache.samza.serializers.model.SamzaObjectMapper
 
-class ApplicationMasterRestServlet(config: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) 
extends ScalatraServlet with ScalateSupport {
+class ApplicationMasterRestServlet(samzaConfig: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState, registry: ReadableMetricsRegistry) 
extends ScalatraServlet with ScalateSupport {
   val yarnConfig = new YarnConfiguration
   val client = new ClientHelper(yarnConfig)
   val jsonMapper = SamzaObjectMapper.getObjectMapper
@@ -43,10 +43,10 @@ class ApplicationMasterRestServlet(config: Config, 
samzaAppState: SamzaApplicati
     val metricMap = new HashMap[String, java.util.Map[String, Object]]
 
     // build metric map
-    registry.getGroups.foreach(group => {
+    registry.getGroups.asScala.foreach(group => {
       val groupMap = new HashMap[String, Object]
 
-      registry.getGroup(group).foreach {
+      registry.getGroup(group).asScala.foreach {
         case (name, metric) =>
           metric.visit(new MetricsVisitor() {
             def counter(counter: Counter) =
@@ -79,7 +79,7 @@ class ApplicationMasterRestServlet(config: Config, 
samzaAppState: SamzaApplicati
   get("/am") {
     val containers = new HashMap[String, HashMap[String, Object]]
 
-    state.runningYarnContainers.foreach {
+    state.runningYarnContainers.asScala.foreach {
       case (containerId, container) =>
         val yarnContainerId = container.id.toString
         val containerMap = new HashMap[String, Object]
@@ -98,10 +98,10 @@ class ApplicationMasterRestServlet(config: Config, 
samzaAppState: SamzaApplicati
       "containers" -> containers,
       "host" -> "%s:%s".format(state.nodeHost, state.rpcUrl.getPort))
 
-    jsonMapper.writeValueAsString(new HashMap[String, Object](status))
+    jsonMapper.writeValueAsString(new HashMap[String, Object](status.asJava))
   }
 
   get("/config") {
-    jsonMapper.writeValueAsString(new HashMap[String, 
Object](config.sanitize.toMap))
+    jsonMapper.writeValueAsString(new HashMap[String, 
Object](samzaConfig.sanitize))
   }
 }

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
index a32cd65..d787f9e 100644
--- 
a/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
+++ 
b/samza-yarn/src/main/scala/org/apache/samza/webapp/ApplicationMasterWebServlet.scala
@@ -24,12 +24,12 @@ import org.scalatra._
 import scalate.ScalateSupport
 import org.apache.samza.job.yarn.YarnAppState
 import org.apache.samza.config.Config
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import scala.collection.immutable.TreeMap
 import org.apache.hadoop.yarn.conf.YarnConfiguration
 import org.apache.hadoop.yarn.webapp.util.WebAppUtils
 
-class ApplicationMasterWebServlet(config: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with 
ScalateSupport {
+class ApplicationMasterWebServlet(samzaConfig: Config, samzaAppState: 
SamzaApplicationState, state: YarnAppState) extends ScalatraServlet with 
ScalateSupport {
   val yarnConfig = new YarnConfiguration
 
   before() {
@@ -38,7 +38,7 @@ class ApplicationMasterWebServlet(config: Config, 
samzaAppState: SamzaApplicatio
 
   get("/") {
     layoutTemplate("/WEB-INF/views/index.scaml",
-      "config" -> TreeMap(config.sanitize.toMap.toArray: _*),
+      "config" -> TreeMap(samzaConfig.sanitize.asScala.toMap.toArray: _*),
       "state" -> state,
       "samzaAppState" -> samzaAppState,
       "rmHttpAddress" -> WebAppUtils.getRMWebAppURLWithScheme(yarnConfig))

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
index d3d34f2..c320a97 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/MockSystemAdmin.scala
@@ -21,8 +21,8 @@ package org.apache.samza.job.yarn
 
 import org.apache.samza.Partition
 import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin, SystemFactory}
-import scala.collection.JavaConversions._
+import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin}
+import scala.collection.JavaConverters._
 
 /**
  * A mock implementation class that returns metadata for each stream that 
contains numTasks partitions in it.
@@ -30,12 +30,12 @@ import scala.collection.JavaConversions._
 class MockSystemAdmin(numTasks: Int) extends SystemAdmin {
   def getOffsetsAfter(offsets: java.util.Map[SystemStreamPartition, String]) = 
null
   def getSystemStreamMetadata(streamNames: java.util.Set[String]) = {
-    streamNames.map(streamName => {
-      var partitionMetadata = (0 until numTasks).map(partitionId => {
+    streamNames.asScala.map(streamName => {
+      val partitionMetadata = (0 until numTasks).map(partitionId => {
         new Partition(partitionId) -> new SystemStreamPartitionMetadata(null, 
null, null)
       }).toMap
-      streamName -> new SystemStreamMetadata(streamName, partitionMetadata)
-    }).toMap[String, SystemStreamMetadata]
+      streamName -> new SystemStreamMetadata(streamName, 
partitionMetadata.asJava)
+    }).toMap.asJava
   }
 
   override def createChangelogStream(topicName: String, 
numOfChangeLogPartitions: Int) {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
index 5c15385..ad8337b 100644
--- a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
+++ b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestClientHelper.scala
@@ -24,11 +24,11 @@ import org.apache.hadoop.fs.{FileStatus, Path, FileSystem}
 import org.apache.hadoop.yarn.api.records.ApplicationId
 import org.apache.hadoop.yarn.client.api.YarnClient
 import org.apache.samza.SamzaException
-import org.apache.samza.config.{MapConfig, JobConfig, Config, YarnConfig}
+import org.apache.samza.config.{MapConfig, JobConfig, YarnConfig}
 import org.mockito.Mockito._
 import org.mockito.Matchers.any
 import org.scalatest.FunSuite
-import org.scalatest.mock.MockitoSugar
+import org.scalatest.mockito.MockitoSugar
 
 
 class TestClientHelper extends FunSuite {

http://git-wip-us.apache.org/repos/asf/samza/blob/9db47b86/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
----------------------------------------------------------------------
diff --git 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
index 1dd0c18..65c03d1 100644
--- 
a/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
+++ 
b/samza-yarn/src/test/scala/org/apache/samza/job/yarn/TestSamzaYarnAppMasterService.scala
@@ -23,17 +23,12 @@ import java.io.BufferedReader
 import java.net.URL
 import java.io.InputStreamReader
 import org.apache.hadoop.yarn.util.ConverterUtils
-import org.apache.samza.Partition
 import org.apache.samza.clustermanager.SamzaApplicationState
 import org.apache.samza.config.MapConfig
-import org.apache.samza.metrics.MetricsRegistry
-import 
org.apache.samza.system.SystemStreamMetadata.SystemStreamPartitionMetadata
-import org.apache.samza.system.{SystemStreamMetadata, SystemStreamPartition, 
SystemAdmin, SystemFactory}
 import org.junit.Assert._
 import org.junit.Test
-import scala.collection.JavaConversions._
+import scala.collection.JavaConverters._
 import org.apache.samza.config.Config
-import org.apache.samza.config.JobConfig.Config2Job
 import org.apache.samza.container.TaskName
 import org.apache.samza.coordinator.JobModelManager
 import org.apache.samza.coordinator.stream.MockCoordinatorStreamSystemFactory
@@ -113,7 +108,7 @@ class TestSamzaYarnAppMasterService {
     "yarn.container.retry.count" -> "1",
     "yarn.container.retry.window.ms" -> "1999999999",
     "job.coordinator.system" -> "coordinator",
-    "systems.coordinator.samza.factory" -> 
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName))
+    "systems.coordinator.samza.factory" -> 
classOf[MockCoordinatorStreamSystemFactory].getCanonicalName).asJava)
 }
 
 

Reply via email to