Github user huaxingao commented on a diff in the pull request: https://github.com/apache/spark/pull/11104#discussion_r52250939 --- Diff: external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaStreamSuite.scala --- @@ -65,12 +67,14 @@ class KafkaStreamSuite extends SparkFunSuite with Eventually with BeforeAndAfter val stream = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder]( ssc, kafkaParams, Map(topic -> 1), StorageLevel.MEMORY_ONLY) - val result = new mutable.HashMap[String, Long]() with mutable.SynchronizedMap[String, Long] + val result = new ConcurrentHashMap[String, Long].asScala stream.map(_._2).countByValue().foreachRDD { r => val ret = r.collect() ret.toMap.foreach { kv => - val count = result.getOrElseUpdate(kv._1, 0) + kv._2 - result.put(kv._1, count) + result.synchronized { + val count = result.getOrElseUpdate(kv._1, 0) + kv._2 --- End diff -- @holdenk @zsxwing I tried _val count = result.putIfAbsent(kv.1, 0) + kv._2,_ but the test failed for me. So I will change to mutable.HashMap and put in synchronized block. Is it OK to use mutable.HashMap and synchronized block in this file only, but use java.util.concurrent.ConcurrentHashMap in other files(StreamingListenerSuite, KinesisStreamTests and FileInputDStream)? Or is it better to to use mutable.HashMap and synchronized block for all the files that has SynchronizedMap?
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org