lianetm commented on code in PR #18728:
URL: https://github.com/apache/kafka/pull/18728#discussion_r1932814308
##########
clients/src/test/java/org/apache/kafka/clients/consumer/KafkaShareConsumerTest.java:
##########
@@ -148,40 +150,51 @@ public void testVerifyFetchAndAcknowledgeSync() throws
InterruptedException {
// [A] A SHARE_FETCH in a new share session, fetching from topic
topicId1, with no acknowledgements included.
// The response includes 2 records which are acquired.
client.prepareResponseFrom(body -> {
- ShareFetchRequest request = (ShareFetchRequest) body;
- return request.data().groupId().equals(groupId) &&
- request.data().shareSessionEpoch() == 0 &&
- request.data().batchSize() == batchSize &&
- request.data().topics().get(0).topicId().equals(topicId1) &&
- request.data().topics().get(0).partitions().size() == 1 &&
-
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().isEmpty();
+ if (body instanceof ShareFetchRequest) {
+ ShareFetchRequest request = (ShareFetchRequest) body;
+ return request.data().groupId().equals(groupId) &&
+ request.data().shareSessionEpoch() == 0 &&
+ request.data().batchSize() == batchSize &&
+ request.data().topics().get(0).topicId().equals(topicId1)
&&
+ request.data().topics().get(0).partitions().size() == 1 &&
+
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().isEmpty();
+ } else {
+ return false;
+ }
}, shareFetchResponse(ti1p0, 2), node);
// [B] A SHARE_ACKNOWLEDGE for the two records acquired in [A].
client.prepareResponseFrom(body -> {
- ShareAcknowledgeRequest request = (ShareAcknowledgeRequest) body;
- return request.data().groupId().equals(groupId) &&
- request.data().shareSessionEpoch() == 1 &&
-
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset()
== 0 &&
-
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset()
== 1 &&
-
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().size()
== 1 &&
-
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().get(0)
== (byte) 1;
+ if (body instanceof ShareAcknowledgeRequest) {
+ ShareAcknowledgeRequest request = (ShareAcknowledgeRequest)
body;
+ return request.data().groupId().equals(groupId) &&
+ request.data().shareSessionEpoch() == 1 &&
+
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).firstOffset()
== 0 &&
+
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).lastOffset()
== 1 &&
+
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().size()
== 1 &&
+
request.data().topics().get(0).partitions().get(0).acknowledgementBatches().get(0).acknowledgeTypes().get(0)
== (byte) 1;
+ } else {
+ return false;
+ }
}, shareAcknowledgeResponse(ti1p0), node);
// [C] A SHARE_ACKNOWLEDGE which closes the share session.
client.prepareResponseFrom(body -> {
- ShareAcknowledgeRequest request = (ShareAcknowledgeRequest) body;
- return request.data().groupId().equals(groupId) &&
- request.data().shareSessionEpoch() == -1 &&
- request.data().topics().isEmpty();
+ if (body instanceof ShareAcknowledgeRequest) {
Review Comment:
should we include the same check in the `testVerifyHeartbeats`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]