lianetm commented on code in PR #15585:
URL: https://github.com/apache/kafka/pull/15585#discussion_r1549895965


##########
core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala:
##########
@@ -159,6 +160,224 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     consumeAndVerifyRecords(consumer = consumer, numRecords = 1, 
startingOffset = 0, startingTimestamp = startingTimestamp)
   }
 
+  /**
+   * Verifies that pattern subscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac', but not 'tblablak' 
or 'tblab1'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When a new topic 'tsomec' is added afterwards, it is expected that upon 
the next
+   * metadata refresh the consumer becomes subscribed to this new topic and 
all partitions
+   * of that topic are assigned to it.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPatternSubscription(quorum: String, groupProtocol: String): Unit = {
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    val topic1 = "tblablac" // matches subscribed pattern
+    createTopic(topic1, 2, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+    val topic2 = "tblablak" // does not match subscribed pattern
+    createTopic(topic2, 2, brokerCount)
+    sendRecords(producer,numRecords = 1000, new TopicPartition(topic2, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic2, 1))
+
+    val topic3 = "tblab1" // does not match subscribed pattern
+    createTopic(topic3, 2, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic3, 1))
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern = Pattern.compile("t.*c")
+    consumer.subscribe(pattern, new TestConsumerReassignmentListener)
+
+    var assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+
+    val topic4 = "tsomec" // matches subscribed pattern
+    createTopic(topic4, 2, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic4, 1))
+
+    assignment ++= Set(
+      new TopicPartition(topic4, 0),
+      new TopicPartition(topic4, 1))
+    awaitAssignment(consumer, assignment)
+
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+  }
+
+  /**
+   * Verifies that a second call to pattern subscription succeeds and performs 
as expected.
+   * The initial subscription is to a pattern that matches two topics 'topic' 
and 'foo'.
+   * The second subscription is to a pattern that matches 'foo' and a new 
topic 'bar'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and 'foo' after
+   * the first subscription, and to all partitions of 'foo' and 'bar' after 
the second.
+   * The metadata refresh interval is intentionally increased to a large 
enough value to guarantee
+   * that it is the subscription call that triggers a metadata refresh, and 
not the timeout.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testSubsequentPatternSubscription(quorum: String, groupProtocol: 
String): Unit = {
+    this.consumerConfig.setProperty(ConsumerConfig.METADATA_MAX_AGE_CONFIG, 
"30000")
+    val consumer = createConsumer()
+
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords = numRecords, tp)
+
+    // the first topic ('topic')  matches first subscription pattern only
+
+    val fooTopic = "foo" // matches both subscription patterns
+    createTopic(fooTopic, 1, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(fooTopic, 0))
+
+    assertEquals(0, consumer.assignment().size)
+
+    val pattern1 = Pattern.compile(".*o.*") // only 'topic' and 'foo' match 
this
+    consumer.subscribe(pattern1, new TestConsumerReassignmentListener)
+
+    var assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(fooTopic, 0))
+    awaitAssignment(consumer, assignment)
+
+    val barTopic = "bar" // matches the next subscription pattern
+    createTopic(barTopic, 1, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(barTopic, 0))
+
+    val pattern2 = Pattern.compile("...") // only 'foo' and 'bar' match this
+    consumer.subscribe(pattern2, new TestConsumerReassignmentListener)
+    assignment --= Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1))
+    assignment ++= Set(
+      new TopicPartition(barTopic, 0))
+    awaitAssignment(consumer, assignment)
+
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+  }
+
+  /**
+   * Verifies that pattern unsubscription performs as expected.
+   * The pattern matches the topics 'topic' and 'tblablac'.
+   * It is expected that the consumer is subscribed to all partitions of 
'topic' and
+   * 'tblablac' after the subscription when metadata is refreshed.
+   * When consumer unsubscribes from all its subscriptions, it is expected 
that its
+   * assignments are cleared right away.
+   */
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testPatternUnsubscription(quorum: String, groupProtocol: String): Unit = 
{
+    val numRecords = 10000
+    val producer = createProducer()
+    sendRecords(producer, numRecords, tp)
+
+    val topic1 = "tblablac" // matches the subscription pattern
+    createTopic(topic1, 2, brokerCount)
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 0))
+    sendRecords(producer, numRecords = 1000, new TopicPartition(topic1, 1))
+
+    val consumer = createConsumer()
+    assertEquals(0, consumer.assignment().size)
+
+    consumer.subscribe(Pattern.compile("t.*c"), new 
TestConsumerReassignmentListener)
+    val assignment = Set(
+      new TopicPartition(topic, 0),
+      new TopicPartition(topic, 1),
+      new TopicPartition(topic1, 0),
+      new TopicPartition(topic1, 1))
+    awaitAssignment(consumer, assignment)
+
+    consumer.unsubscribe()
+    assertEquals(0, consumer.assignment().size)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testCommitMetadata(quorum: String, groupProtocol: String): Unit = {
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+
+    // sync commit
+    val syncMetadata = new OffsetAndMetadata(5, Optional.of(15), "foo")
+    consumer.commitSync(Map((tp, syncMetadata)).asJava)
+    assertEquals(syncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+    // async commit
+    val asyncMetadata = new OffsetAndMetadata(10, "bar")
+    sendAndAwaitAsyncCommit(consumer, Some(Map(tp -> asyncMetadata)))
+    assertEquals(asyncMetadata, consumer.committed(Set(tp).asJava).get(tp))
+
+    // handle null metadata
+    val nullMetadata = new OffsetAndMetadata(5, null)
+    consumer.commitSync(Map(tp -> nullMetadata).asJava)
+    assertEquals(nullMetadata, consumer.committed(Set(tp).asJava).get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testAsyncCommit(quorum: String, groupProtocol: String): Unit = {
+    val consumer = createConsumer()
+    consumer.assign(List(tp).asJava)
+
+    val callback = new CountConsumerCommitCallback
+    val count = 5
+
+    for (i <- 1 to count)
+      consumer.commitAsync(Map(tp -> new OffsetAndMetadata(i)).asJava, 
callback)
+
+    TestUtils.pollUntilTrue(consumer, () => callback.successCount >= count || 
callback.lastError.isDefined,
+      "Failed to observe commit callback before timeout", waitTimeMs = 10000)
+
+    assertEquals(None, callback.lastError)
+    assertEquals(count, callback.successCount)
+    assertEquals(new OffsetAndMetadata(count), 
consumer.committed(Set(tp).asJava).get(tp))
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testExpandingTopicSubscriptions(quorum: String, groupProtocol: String): 
Unit = {
+    val otherTopic = "other"
+    val initialAssignment = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, initialAssignment)
+
+    createTopic(otherTopic, 2, brokerCount)
+    val expandedAssignment = initialAssignment ++ Set(new 
TopicPartition(otherTopic, 0), new TopicPartition(otherTopic, 1))
+    consumer.subscribe(List(topic, otherTopic).asJava)
+    awaitAssignment(consumer, expandedAssignment)
+  }
+
+  @ParameterizedTest(name = 
TestInfoUtils.TestWithParameterizedQuorumAndGroupProtocolNames)
+  @MethodSource(Array("getTestQuorumAndGroupProtocolParametersAll"))
+  def testShrinkingTopicSubscriptions(quorum: String, groupProtocol: String): 
Unit = {
+    val otherTopic = "other"
+    createTopic(otherTopic, 2, brokerCount)
+    val initialAssignment = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1), new TopicPartition(otherTopic, 0), new 
TopicPartition(otherTopic, 1))
+    val consumer = createConsumer()
+    consumer.subscribe(List(topic, otherTopic).asJava)
+    awaitAssignment(consumer, initialAssignment)
+
+    val shrunkenAssignment = Set(new TopicPartition(topic, 0), new 
TopicPartition(topic, 1))
+    consumer.subscribe(List(topic).asJava)
+    awaitAssignment(consumer, shrunkenAssignment)
+  }
+

Review Comment:
   We shouldn't re-add all these subscription tests here. They were moved to a 
new file `PlaintextConsumerSubscriptionTest`. I expect this PR should now have 
no changes in the `PlainTextConsumerTest`, and only remove the TODOs for these 
tests in the `PlaintextConsumerSubscriptionTest`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to