AndrewJSchofield commented on code in PR #20246:
URL: https://github.com/apache/kafka/pull/20246#discussion_r2448635132


##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandlerTest.java:
##########
@@ -54,6 +55,7 @@
 @Timeout(120)
 public class ShareSessionHandlerTest {
     private static final LogContext LOG_CONTEXT = new 
LogContext("[ShareSessionHandler]=");
+    private final ShareAcquireMode shareAcquireMode = 
ShareAcquireMode.of("batch_optimized");

Review Comment:
   I would expect at least one test using `ShareAcquireMode.RECORD_LIMIT` to 
make sure it gets set correctly. You've just tested the default so far.



##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManagerTest.java:
##########
@@ -2639,6 +2640,7 @@ private <K, V> void buildRequestManager(MetricConfig 
metricConfig,
         int maxBytes = Integer.MAX_VALUE;
         int fetchSize = 1000;
         int minBytes = 1;
+        ShareAcquireMode shareAcquireMode = 
ShareAcquireMode.of("batch_optimized");

Review Comment:
   Can't you just use `ShareAcquireMode.,BATCH_OPTIMIZED`?



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java:
##########
@@ -78,6 +79,11 @@ public class ShareSessionHandler {
      */
     private LinkedHashMap<TopicIdPartition, Acknowledgements> 
nextAcknowledgements;
 
+    /**
+     * The share acquire mode for this session.
+     */
+    private String shareAcquireMode;

Review Comment:
   Let's use the enum here instead of String.



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareConsumeRequestManager.java:
##########
@@ -106,6 +108,7 @@ public class ShareConsumeRequestManager implements 
RequestManager, MemberStateLi
                                final ShareConsumerMetadata metadata,
                                final SubscriptionState subscriptions,
                                final FetchConfig fetchConfig,

Review Comment:
   I think it's time to make a `ShareFetchConfig` which copies all of the 
`FetchConfig` but also adds in `share.acquire.mode`. We are bound to add more 
configs over time and I don't like adding new parameters every time. A quick 
search of the code suggests that `ShareFetchConfig` would fit in cleanly 
(unlike `ShareSubscriptionState` which I'd love to create, but 
`SubscriptionState` is pervasive).



##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/ShareSessionHandler.java:
##########
@@ -111,7 +117,8 @@ public boolean isNewSession() {
         return nextMetadata.isNewSession();
     }
 
-    public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, 
FetchConfig fetchConfig) {
+
+    public ShareFetchRequest.Builder newShareFetchBuilder(String groupId, 
ShareAcquireMode shareAcquireMode, FetchConfig fetchConfig) {

Review Comment:
   This is an example of why I'd like `ShareFetchConfig` instead.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to