lianetm commented on code in PR #17699:
URL: https://github.com/apache/kafka/pull/17699#discussion_r1869882439


##########
clients/src/main/java/org/apache/kafka/clients/consumer/internals/events/ApplicationEventProcessor.java:
##########
@@ -515,6 +531,89 @@ private void process(final 
ShareAcknowledgementCommitCallbackRegistrationEvent e
         
manager.setAcknowledgementCommitCallbackRegistered(event.isCallbackRegistered());
     }
 
+    private void process(final SeekUnvalidatedEvent event) {
+        try {
+            event.offsetEpoch().ifPresent(epoch -> 
metadata.updateLastSeenEpochIfNewer(event.partition(), epoch));
+            SubscriptionState.FetchPosition newPosition = new 
SubscriptionState.FetchPosition(
+                event.offset(),
+                event.offsetEpoch(),
+                metadata.currentLeader(event.partition())
+            );
+            subscriptions.seekUnvalidated(event.partition(), newPosition);
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final PausePartitionsEvent event) {
+        try {
+            Collection<TopicPartition> partitions = event.partitions();
+            log.debug("Pausing partitions {}", partitions);
+
+            for (TopicPartition partition : partitions) {
+                subscriptions.pause(partition);
+            }
+
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final ResumePartitionsEvent event) {
+        try {
+            Collection<TopicPartition> partitions = event.partitions();
+            log.debug("Resuming partitions {}", partitions);
+
+            for (TopicPartition partition : partitions) {
+                subscriptions.resume(partition);
+            }
+
+            event.future().complete(null);
+        } catch (Exception e) {
+            event.future().completeExceptionally(e);
+        }
+    }
+
+    private void process(final CurrentLagEvent event) {

Review Comment:
   no need, you're right. My concern was that with the move to the background 
the tests for the consumer would be mocking this part but it's not the case, so 
the existing KafkaConsumerTests that cover lag will cover this. All good



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to