mjsax commented on a change in pull request #8799:
URL: https://github.com/apache/kafka/pull/8799#discussion_r435597397
##########
File path:
streams/src/test/java/org/apache/kafka/streams/integration/RegexSourceIntegrationTest.java
##########
@@ -142,83 +142,89 @@ public void tearDown() throws IOException {
@Test
public void testRegexMatchesTopicsAWhenCreated() throws Exception {
+ try {
+ final Serde<String> stringSerde = Serdes.String();
- final Serde<String> stringSerde = Serdes.String();
-
- final List<String> expectedFirstAssignment =
Collections.singletonList("TEST-TOPIC-1");
- // we compare lists of subscribed topics and hence requiring the order
as well; this is guaranteed
- // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only add
TEST-TOPIC-2 so the list is always
- // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429
behavior ever changed it may become a flaky test
- final List<String> expectedSecondAssignment =
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
+ final List<String> expectedFirstAssignment =
Collections.singletonList("TEST-TOPIC-1");
+ // we compare lists of subscribed topics and hence requiring the
order as well; this is guaranteed
+ // with KIP-429 since we would NOT revoke TEST-TOPIC-1 but only
add TEST-TOPIC-2 so the list is always
+ // in the order of "TEST-TOPIC-1, TEST-TOPIC-2". Note if KIP-429
behavior ever changed it may become a flaky test
+ final List<String> expectedSecondAssignment =
Arrays.asList("TEST-TOPIC-1", "TEST-TOPIC-2");
- CLUSTER.createTopic("TEST-TOPIC-1");
+ CLUSTER.createTopic("TEST-TOPIC-1");
- final StreamsBuilder builder = new StreamsBuilder();
+ final StreamsBuilder builder = new StreamsBuilder();
- final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
+ final KStream<String, String> pattern1Stream =
builder.stream(Pattern.compile("TEST-TOPIC-\\d"));
- pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
- final List<String> assignedTopics = new CopyOnWriteArrayList<>();
- streams = new KafkaStreams(builder.build(), streamsConfiguration, new
DefaultKafkaClientSupplier() {
- @Override
- public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
- return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
- @Override
- public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
- super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
- }
- };
+ pattern1Stream.to(outputTopic, Produced.with(stringSerde,
stringSerde));
+ final List<String> assignedTopics = new CopyOnWriteArrayList<>();
+ streams = new KafkaStreams(builder.build(), streamsConfiguration,
new DefaultKafkaClientSupplier() {
+ @Override
+ public Consumer<byte[], byte[]> getConsumer(final Map<String,
Object> config) {
+ return new KafkaConsumer<byte[], byte[]>(config, new
ByteArrayDeserializer(), new ByteArrayDeserializer()) {
+ @Override
+ public void subscribe(final Pattern topics, final
ConsumerRebalanceListener listener) {
+ super.subscribe(topics, new
TheConsumerRebalanceListener(assignedTopics, listener));
+ }
+ };
- }
- });
+ }
+ });
- streams.start();
- TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
+ streams.start();
+ TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedFirstAssignment), STREAM_TASKS_NOT_UPDATED);
- CLUSTER.createTopic("TEST-TOPIC-2");
+ CLUSTER.createTopic("TEST-TOPIC-2");
- TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
+ TestUtils.waitForCondition(() ->
assignedTopics.equals(expectedSecondAssignment), STREAM_TASKS_NOT_UPDATED);
- streams.close();
- CLUSTER.deleteTopicsAndWait("TEST-TOPIC-1", "TEST-TOPIC-2");
+ streams.close();
Review comment:
Yet, that is my reasoning.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]