jeffkbkim commented on code in PR #18499: URL: https://github.com/apache/kafka/pull/18499#discussion_r1913692572
########## coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java: ########## @@ -3080,6 +3137,89 @@ public void testHighWatermarkUpdate() { assertTrue(write2.isDone()); } + @Test + public void testHighWatermarkUpdateWithException() throws ExecutionException, InterruptedException, TimeoutException { + MockTimer timer = new MockTimer(); + MockPartitionWriter writer = new MockPartitionWriter(); + CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class); + + // All operations will throw an exception when completed. + doThrow(new KafkaException("error")).when(metrics).recordEventPurgatoryTime(anyLong()); + + CoordinatorRuntime<MockCoordinatorShard, String> runtime = + new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(Duration.ofMillis(20)) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(writer) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(metrics) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withAppendLingerMs(10) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + // Load the coordinator. + runtime.scheduleLoadOperation(TP, 10); + CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); + + // Get the max batch size. + int maxBatchSize = writer.config(TP).maxMessageSize(); + + // Create records with three quarters of the max batch size each, so that it is not + // possible to have more than one record in a single batch. + List<String> records = Stream.of('1', '2', '3').map(c -> { + char[] payload = new char[maxBatchSize * 3 / 4]; + Arrays.fill(payload, c); + return new String(payload); + }).collect(Collectors.toList()); + + // Write #1. + CompletableFuture<String> write1 = runtime.scheduleWriteOperation("write#1", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of(records.get(0)), "response1") + ); + + // Write #2. + CompletableFuture<String> write2 = runtime.scheduleWriteOperation("write#2", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of(records.get(1)), "response2") + ); + + // Write #3. + CompletableFuture<String> write3 = runtime.scheduleWriteOperation("write#3", TP, Duration.ofMillis(20), + state -> new CoordinatorResult<>(List.of(records.get(2)), "response3") + ); + + // Records have been written to the log. + assertEquals(List.of( + records(timer.time().milliseconds(), records.get(0)), + records(timer.time().milliseconds(), records.get(1)) + ), writer.entries(TP)); + + // Verify that no writes are committed yet. + assertFalse(write1.isDone()); + assertFalse(write2.isDone()); + assertFalse(write3.isDone()); + + // Write #3 is waiting in the current batch. + assertNotNull(ctx.currentBatch); + + // Commit the first and second record. + writer.commit(TP, 2); + + // Write #1's completion throws an exception. The coordinator should fail write #2 and the + // current batch. + assertTrue(write1.isDone()); + assertTrue(write2.isDone()); + assertTrue(write3.isDone()); + assertEquals("response1", write1.get(5, TimeUnit.SECONDS)); Review Comment: the response is completed here because recordEventPurgatoryTime is called after completing the event, correct? ########## coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java: ########## @@ -662,6 +662,7 @@ private void transitionTo( .build(), tp ); + runtimeMetrics.recordPartitionStateChange(oldState, state); Review Comment: thanks for the catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org