AndrewJSchofield commented on code in PR #16903:
URL: https://github.com/apache/kafka/pull/16903#discussion_r1721354931
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -1080,4 +1084,23 @@ public V getSyncRequest() {
Pair<AcknowledgeRequestState> requestStates(int nodeId) {
return acknowledgeRequestStates.get(nodeId);
}
+
+ @InterfaceStability.Evolving
Review Comment:
You don't need to use `InterfaceStability` on an internal enum like this.
That annotation is for public interface to Kafka to warn users that their code
might have to change if the interface is still unstable (such as
KafkaShareConsumer).
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -401,7 +404,6 @@ public void commitAsync(final Map<TopicIdPartition,
Acknowledgements> acknowledg
metricsManager.recordAcknowledgementSent(acknowledgements.size());
log.debug("Added async acknowledge request for
partition {} to node {}", tip.topicPartition(), node.id());
- resultCount.incrementAndGet();
Review Comment:
This is a bit weird, making and then not using the AtomicInteger. I expect
with a small code change, you could safely not use the result count for async
commits.
##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ShareAcknowledgementCommitCallbackEvent.java:
##########
@@ -27,7 +28,7 @@ public class ShareAcknowledgementCommitCallbackEvent extends
BackgroundEvent {
public ShareAcknowledgementCommitCallbackEvent(Map<TopicIdPartition,
Acknowledgements> acknowledgementsMap) {
super(Type.SHARE_ACKNOWLEDGEMENT_COMMIT_CALLBACK);
- this.acknowledgementsMap = acknowledgementsMap;
+ this.acknowledgementsMap = new HashMap<>(acknowledgementsMap);
Review Comment:
:)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]