brenden20 commented on code in PR #16140:
URL: https://github.com/apache/kafka/pull/16140#discussion_r1631524427
##########
clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkThreadTest.java:
##########
@@ -211,178 +232,80 @@ public void testResetPositionsProcessFailureIsIgnored() {
}
@Test
- public void testValidatePositionsEventIsProcessed() {
- ValidatePositionsEvent e = new
ValidatePositionsEvent(calculateDeadlineMs(time, 100));
- applicationEventsQueue.add(e);
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(ValidatePositionsEvent.class));
- assertTrue(applicationEventsQueue.isEmpty());
- }
-
- @Test
- public void testAssignmentChangeEvent() {
- HashMap<TopicPartition, OffsetAndMetadata> offset =
mockTopicPartitionOffset();
-
- final long currentTimeMs = time.milliseconds();
- ApplicationEvent e = new AssignmentChangeEvent(offset, currentTimeMs);
- applicationEventsQueue.add(e);
-
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(AssignmentChangeEvent.class));
- verify(networkClient, times(1)).poll(anyLong(), anyLong());
- verify(commitRequestManager,
times(1)).updateAutoCommitTimer(currentTimeMs);
- // Assignment change should generate an async commit (not retried).
- verify(commitRequestManager, times(1)).maybeAutoCommitAsync();
- }
+ public void testPollResultTimer() {
+ NetworkClientDelegate networkClientDelegate = new
NetworkClientDelegate(
+ time,
+ config,
+ logContext,
+ client
+ );
- @Test
- void testFetchTopicMetadata() {
- applicationEventsQueue.add(new TopicMetadataEvent("topic",
Long.MAX_VALUE));
- consumerNetworkThread.runOnce();
-
verify(applicationEventProcessor).process(any(TopicMetadataEvent.class));
+ NetworkClientDelegate.UnsentRequest req = new
NetworkClientDelegate.UnsentRequest(
+ new FindCoordinatorRequest.Builder(
+ new FindCoordinatorRequestData()
+
.setKeyType(FindCoordinatorRequest.CoordinatorType.TRANSACTION.id())
+ .setKey("foobar")),
+ Optional.empty());
+ req.setTimer(time, DEFAULT_REQUEST_TIMEOUT_MS);
+
+ // purposely setting a non-MAX time to ensure it is returning
Long.MAX_VALUE upon success
+ NetworkClientDelegate.PollResult success = new
NetworkClientDelegate.PollResult(
+ 10,
+ Collections.singletonList(req));
+ assertEquals(10, networkClientDelegate.addAll(success));
+
+ NetworkClientDelegate.PollResult failure = new
NetworkClientDelegate.PollResult(
+ 10,
+ new ArrayList<>());
+ assertEquals(10, networkClientDelegate.addAll(failure));
Review Comment:
It sems some of my changes were overwritten when I merged, I am fixing that
up
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]