Github user tdas commented on a diff in the pull request:

    https://github.com/apache/spark/pull/4537#discussion_r29533586
  
    --- Diff: 
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
 ---
    @@ -73,27 +74,37 @@ class KafkaRDDSuite extends FunSuite with 
BeforeAndAfterAll {
     
       test("iterator boundary conditions") {
         // the idea is to find e.g. off-by-one errors between what kafka has 
available and the rdd
    -    val topic = "topic1"
    +    val topic = s"topicboundary-${Random.nextInt}"
         val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
         kafkaTestUtils.createTopic(topic)
     
         val kafkaParams = Map("metadata.broker.list" -> 
kafkaTestUtils.brokerAddress,
    -      "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
    +      "group.id" -> s"test-consumer-${Random.nextInt}")
     
         val kc = new KafkaCluster(kafkaParams)
     
         // this is the "lots of messages" case
         kafkaTestUtils.sendMessages(topic, sent)
    +    val sentCount = sent.values.sum
    +    kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
    +
         // rdd defined from leaders after sending messages, should get the 
number sent
         val rdd = getRdd(kc, Set(topic))
     
         assert(rdd.isDefined)
    -    assert(rdd.get.count === sent.values.sum, "didn't get all sent 
messages")
     
    -    val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
    -      .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) -> 
o.untilOffset).toMap
    +    val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
    +    val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
     
    -    kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
    +    assert(rangeCount === sentCount, "offset range didn't include all sent 
messages")
    +    assert(rdd.get.count === sentCount, "didn't get all sent messages")
    +
    +    val rangesMap = ranges.map(o => TopicAndPartition(o.topic, 
o.partition) -> o.untilOffset).toMap
    +
    +    kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
    --- End diff --
    
    What is this for? Its not obvious, could you add a comment?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to