lucasbru commented on code in PR #20592:
URL: https://github.com/apache/kafka/pull/20592#discussion_r2382259714
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -711,9 +726,8 @@ public void
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
expectedCommittedRecordsAfterRebalance,
"The all committed records after rebalance do not match what
expected");
- LOG.info("Releasing Stall");
Review Comment:
why have you removed this
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -711,9 +726,8 @@ public void
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
expectedCommittedRecordsAfterRebalance,
"The all committed records after rebalance do not match what
expected");
- LOG.info("Releasing Stall");
doStall = false;
- // Once the stalling host rejoins the group, we expect both
instances to see both instances.
+ // After removing the stalling instance, we expect both remaining
instances to see both instances.
Review Comment:
why are you changin this comment
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -181,16 +190,19 @@ public void createTopics() throws Exception {
CLUSTER.createTopic(MULTI_PARTITION_INPUT_TOPIC, NUM_TOPIC_PARTITIONS,
1);
CLUSTER.createTopic(MULTI_PARTITION_THROUGH_TOPIC,
NUM_TOPIC_PARTITIONS, 1);
CLUSTER.createTopic(MULTI_PARTITION_OUTPUT_TOPIC,
NUM_TOPIC_PARTITIONS, 1);
+ CLUSTER.setGroupStandbyReplicas(applicationId, 1);
Review Comment:
INteresting that this works even if we use the old protocol.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -667,24 +687,19 @@ public void
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
"Expected a host to start stalling"
);
final String observedStallingHost = stallingHost.get();
- final KafkaStreams stallingInstance;
final KafkaStreams remainingInstance;
if ("streams1".equals(observedStallingHost)) {
- stallingInstance = streams1;
remainingInstance = streams2;
} else if ("streams2".equals(observedStallingHost)) {
- stallingInstance = streams2;
remainingInstance = streams1;
} else {
throw new IllegalArgumentException("unexpected host name: " +
observedStallingHost);
}
- // the stalling instance won't have an updated view, and it
doesn't matter what it thinks
- // the assignment is. We only really care that the remaining
instance only sees one host
- // that owns both partitions.
+ // After removing the stalling instance, we only care that the
remaining instance
+ // sees one host that owns both partitions.
waitForCondition(
- () -> stallingInstance.metadataForAllStreamsClients().size()
== 2
- && remainingInstance.metadataForAllStreamsClients().size()
== 1
+ () -> remainingInstance.metadataForAllStreamsClients().size()
== 1
Review Comment:
cc @bbejeck
We have a difference between IQv2 in old protocol and new protocol in this
test.
We drop a "stalling instance" from the group. Then we check that the
remaining instance has correctly reflected the change (all partitions are
served by the remaining instance).
What the stalling instance has in it's IQv2 metadata doesn't really matter,
since it dropped from the group. There is also a comment here:
```
// the stalling instance won't have an updated view, and it
doesn't matter what it thinks
// the assignment is.
```
However, we still seem to check it's view:
```
stallingInstance.metadataForAllStreamsClients().size() == 2
```
There is a difference in timing here, I think the stalling instance never
sees the second instance (maybe down to hearbeat interval).
So my thinking is that we can remove this check
```
stallingInstance.metadataForAllStreamsClients().size() == 2
```
Just asking to validate my thinking.
##########
streams/integration-tests/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java:
##########
@@ -667,24 +687,19 @@ public void
shouldNotViolateEosIfOneTaskGetsFencedUsingIsolatedAppInstances(fina
"Expected a host to start stalling"
);
final String observedStallingHost = stallingHost.get();
- final KafkaStreams stallingInstance;
final KafkaStreams remainingInstance;
if ("streams1".equals(observedStallingHost)) {
- stallingInstance = streams1;
remainingInstance = streams2;
} else if ("streams2".equals(observedStallingHost)) {
- stallingInstance = streams2;
remainingInstance = streams1;
} else {
throw new IllegalArgumentException("unexpected host name: " +
observedStallingHost);
}
- // the stalling instance won't have an updated view, and it
doesn't matter what it thinks
- // the assignment is. We only really care that the remaining
instance only sees one host
- // that owns both partitions.
+ // After removing the stalling instance, we only care that the
remaining instance
Review Comment:
@RaidenE1 Why have you removed the comment about the stalling instance
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]