Github user tdas commented on a diff in the pull request:
https://github.com/apache/spark/pull/4537#discussion_r29533586
--- Diff:
external/kafka/src/test/scala/org/apache/spark/streaming/kafka/KafkaRDDSuite.scala
---
@@ -73,27 +74,37 @@ class KafkaRDDSuite extends FunSuite with
BeforeAndAfterAll {
test("iterator boundary conditions") {
// the idea is to find e.g. off-by-one errors between what kafka has
available and the rdd
- val topic = "topic1"
+ val topic = s"topicboundary-${Random.nextInt}"
val sent = Map("a" -> 5, "b" -> 3, "c" -> 10)
kafkaTestUtils.createTopic(topic)
val kafkaParams = Map("metadata.broker.list" ->
kafkaTestUtils.brokerAddress,
- "group.id" -> s"test-consumer-${Random.nextInt(10000)}")
+ "group.id" -> s"test-consumer-${Random.nextInt}")
val kc = new KafkaCluster(kafkaParams)
// this is the "lots of messages" case
kafkaTestUtils.sendMessages(topic, sent)
+ val sentCount = sent.values.sum
+ kafkaTestUtils.waitUntilLeaderOffset(topic, 0, sentCount)
+
// rdd defined from leaders after sending messages, should get the
number sent
val rdd = getRdd(kc, Set(topic))
assert(rdd.isDefined)
- assert(rdd.get.count === sent.values.sum, "didn't get all sent
messages")
- val ranges = rdd.get.asInstanceOf[HasOffsetRanges]
- .offsetRanges.map(o => TopicAndPartition(o.topic, o.partition) ->
o.untilOffset).toMap
+ val ranges = rdd.get.asInstanceOf[HasOffsetRanges].offsetRanges
+ val rangeCount = ranges.map(o => o.untilOffset - o.fromOffset).sum
- kc.setConsumerOffsets(kafkaParams("group.id"), ranges)
+ assert(rangeCount === sentCount, "offset range didn't include all sent
messages")
+ assert(rdd.get.count === sentCount, "didn't get all sent messages")
+
+ val rangesMap = ranges.map(o => TopicAndPartition(o.topic,
o.partition) -> o.untilOffset).toMap
+
+ kc.setConsumerOffsets(kafkaParams("group.id"), rangesMap).fold(
--- End diff --
What is this for? Its not obvious, could you add a comment?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]