Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
cadonna merged PR #15669: URL: https://github.com/apache/kafka/pull/15669 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2045510642 Thanks @cadonna! fixed it in the entire file. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
cadonna commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2045435265 @lianetm I found the same typo throughout the test class. Do you want to fix it in the entire file? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
cadonna commented on code in PR #15669: URL: https://github.com/apache/kafka/pull/15669#discussion_r1557825632 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -498,6 +538,40 @@ public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() { verify(commitRequestManger, times(2)).resetAutoCommitTimer(); } +@Test +public void testAutoCommitOnIntervalSkippedIfPreviousOneInFlight() { +TopicPartition t1p = new TopicPartition("topic1", 0); +subscriptionState.assignFromUser(singleton(t1p)); +subscriptionState.seek(t1p, 100); + +CommitRequestManager commitRequestManager = create(true, 100); + +// Send auto-commit request that will remain in-flight without a response +time.sleep(100); +commitRequestManger.updateAutoCommitTimer(time.milliseconds()); +commitRequestManger.maybeAutoCommitAsync(); +List futures = assertPoll(1, commitRequestManger); +assertEquals(1, futures.size()); +NetworkClientDelegate.FutureCompletionHandler inflightCommitResult = futures.get(0); +verify(commitRequestManger, times(1)).resetAutoCommitTimer(); +clearInvocations(commitRequestManger); + +// After next interval expires, no new auto-commit request should be sent. The interval +// should not be reset either, to ensure that the next auto-commit is sent out as soon as +// the inflight receives a response. +time.sleep(100); +commitRequestManger.updateAutoCommitTimer(time.milliseconds()); +commitRequestManger.maybeAutoCommitAsync(); +assertPoll(0, commitRequestManger); +verify(commitRequestManger, never()).resetAutoCommitTimer(); + +// When a response for the inflight is received, a next auto-commit should be sent when +// polling the manager. +inflightCommitResult.onComplete( +mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short) 1, Errors.NONE)); +assertPoll(1, commitRequestManger); +} Review Comment: ```suggestion // Send auto-commit request that will remain in-flight without a response time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); commitRequestManager.maybeAutoCommitAsync(); List futures = assertPoll(1, commitRequestManager); assertEquals(1, futures.size()); NetworkClientDelegate.FutureCompletionHandler inflightCommitResult = futures.get(0); verify(commitRequestManager, times(1)).resetAutoCommitTimer(); clearInvocations(commitRequestManager); // After next interval expires, no new auto-commit request should be sent. The interval // should not be reset either, to ensure that the next auto-commit is sent out as soon as // the inflight receives a response. time.sleep(100); commitRequestManager.updateAutoCommitTimer(time.milliseconds()); commitRequestManager.maybeAutoCommitAsync(); assertPoll(0, commitRequestManager); verify(commitRequestManager, never()).resetAutoCommitTimer(); // When a response for the inflight is received, a next auto-commit should be sent when // polling the manager. inflightCommitResult.onComplete( mockOffsetCommitResponse(t1p.topic(), t1p.partition(), (short) 1, Errors.NONE)); assertPoll(1, commitRequestManager); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
cadonna commented on code in PR #15669: URL: https://github.com/apache/kafka/pull/15669#discussion_r1557815764 ## clients/src/test/java/org/apache/kafka/clients/consumer/internals/CommitRequestManagerTest.java: ## @@ -498,6 +538,40 @@ public void testAutoCommitEmptyDoesNotLeaveInflightRequestFlagOn() { verify(commitRequestManger, times(2)).resetAutoCommitTimer(); } +@Test +public void testAutoCommitOnIntervalSkippedIfPreviousOneInFlight() { +TopicPartition t1p = new TopicPartition("topic1", 0); +subscriptionState.assignFromUser(singleton(t1p)); +subscriptionState.seek(t1p, 100); + +CommitRequestManager commitRequestManger = create(true, 100); Review Comment: nit: ```suggestion CommitRequestManager commitRequestManager = create(true, 100); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2043121631 Hey @cadonna, could you take a look if you have some time too? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm commented on PR #15669: URL: https://github.com/apache/kafka/pull/15669#issuecomment-2039970916 Hey @lucasbru, could you take a look at this when you have a chance? Thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[PR] KAFKA-16004: auto-commit inflight improved logs, docs and tests [kafka]
lianetm opened a new pull request, #15669: URL: https://github.com/apache/kafka/pull/15669 Minor changes for improving the logging and docs related to the auto-commit inflight logic, also adding tests to ensure the expected behaviour: - auto-commit on the interval does not send a request if another one inflight, and it sends the next as soon as a response is received (without waiting for the full interval again) - auto-commit before revocation always send a request (even if another one from auto-commit on interval is in-flight), to ensure the latest is committed before revoking partitions. No changes in logic, just adding tests, docs and minor refactoring. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org