dajac commented on code in PR #14981:
URL: https://github.com/apache/kafka/pull/14981#discussion_r1424473429


##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -135,6 +138,11 @@ public Builder<S, U> withTimer(Timer timer) {
             return this;
         }
 
+        public Builder<S, U> withDefaultWriteTimeOut(Duration timeout) {

Review Comment:
   nit: `timeout` -> `defaultWriteTimeout`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -595,13 +609,15 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
         /**
          * Constructor.
          *
-         * @param name  The operation name.
-         * @param tp    The topic partition that the operation is applied to.
-         * @param op    The write operation.
+         * @param name                  The operation name.
+         * @param tp                    The topic partition that the operation 
is applied to.
+         * @param defaultWriteTimeout   The default write operation timeout
+         * @param op                    The write operation.
          */
         CoordinatorWriteEvent(
             String name,
             TopicPartition tp,
+            Duration defaultWriteTimeout,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
          */
         final CompletableFuture<T> future;
 
+        /**
+         * Timeout value for the write operation
+         */
+        final Duration defaultWriteTimeout;

Review Comment:
   nit: This one should actually be `writeTimeout`.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -581,6 +590,11 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
          */
         final CompletableFuture<T> future;
 
+        /**
+         * Timeout value for the write operation

Review Comment:
   nit: `.`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
     }
 
+    @Test
+    public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept on write.
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Write #1. We should get a TimeoutException because the HWM will not 
advance.
+        CompletableFuture<String> timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,
+            state -> new CoordinatorResult<>(Arrays.asList("record1", 
"record2"), "response1"));
+
+        timer.advanceClock(DEFAULT_WRITE_TIMEOUT.toMillis() * 2);

Review Comment:
   nit: I would just do `timeout + 1` if possible.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -610,26 +626,29 @@ class CoordinatorWriteEvent<T> implements 
CoordinatorEvent, DeferredEvent {
                 null,
                 RecordBatch.NO_PRODUCER_ID,
                 RecordBatch.NO_PRODUCER_EPOCH,
+                defaultWriteTimeout,
                 op
             );
         }
 
         /**
          * Constructor.
          *
-         * @param name              The operation name.
-         * @param tp                The topic partition that the operation is 
applied to.
-         * @param transactionalId   The transactional id.
-         * @param producerId        The producer id.
-         * @param producerEpoch     The producer epoch.
-         * @param op                The write operation.
+         * @param name                      The operation name.
+         * @param tp                        The topic partition that the 
operation is applied to.
+         * @param transactionalId           The transactional id.
+         * @param producerId                The producer id.
+         * @param producerEpoch             The producer epoch.
+         * @param defaultWriteTimeout       The write operation timeout
+         * @param op                        The write operation.
          */
         CoordinatorWriteEvent(
             String name,
             TopicPartition tp,
             String transactionalId,
             long producerId,
             short producerEpoch,
+            Duration defaultWriteTimeout,

Review Comment:
   ditto.



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -703,6 +723,18 @@ public void run() {
                             // Add the response to the deferred queue.
                             if (!future.isDone()) {
                                 context.deferredEventQueue.add(offset, this);
+                                timer.add(new 
TimerTask(defaultWriteTimeout.toMillis()) {
+                                    @Override
+                                    public void run() {
+                                        if (!future.isDone()) {
+                                            scheduleInternalOperation(
+                                                "LogAppendEvent(name=" + name 
+ ", tp=" + tp + ")",
+                                                tp,
+                                                () -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))

Review Comment:
   Another thing, we should use 
`org.apache.kafka.common.errors.TimeoutException`. Could we also ensure this in 
the test `testScheduleWriteOpWhenWriteTimesOut`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -703,6 +723,18 @@ public void run() {
                             // Add the response to the deferred queue.
                             if (!future.isDone()) {
                                 context.deferredEventQueue.add(offset, this);
+                                timer.add(new 
TimerTask(defaultWriteTimeout.toMillis()) {
+                                    @Override
+                                    public void run() {
+                                        if (!future.isDone()) {
+                                            scheduleInternalOperation(
+                                                "LogAppendEvent(name=" + name 
+ ", tp=" + tp + ")",

Review Comment:
   nit: `WriteTimeout`?



##########
group-coordinator/src/main/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntime.java:
##########
@@ -703,6 +723,18 @@ public void run() {
                             // Add the response to the deferred queue.
                             if (!future.isDone()) {
                                 context.deferredEventQueue.add(offset, this);
+                                timer.add(new 
TimerTask(defaultWriteTimeout.toMillis()) {
+                                    @Override
+                                    public void run() {
+                                        if (!future.isDone()) {
+                                            scheduleInternalOperation(
+                                                "LogAppendEvent(name=" + name 
+ ", tp=" + tp + ")",
+                                                tp,
+                                                () -> complete(new 
TimeoutException("Log append event " + name + "timed out for TopicPartition " + 
tp))

Review Comment:
   nit: `"CoordinatorWriteEvent " + name + " timed out after " + timeoutMs + 
"ms"`.



##########
group-coordinator/src/test/java/org/apache/kafka/coordinator/group/runtime/CoordinatorRuntimeTest.java:
##########
@@ -950,6 +965,43 @@ public void testScheduleWriteOpWhenWriteFails() {
         assertEquals(mkSet("record1", "record2"), 
ctx.coordinator.coordinator().records());
     }
 
+    @Test
+    public void testScheduleWriteOpWhenWriteTimesOut() throws 
InterruptedException {
+        MockTimer timer = new MockTimer();
+        // The partition writer only accept on write.
+        MockPartitionWriter writer = new MockPartitionWriter();
+
+        CoordinatorRuntime<MockCoordinatorShard, String> runtime =
+            new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
+                .withTime(timer.time())
+                .withTimer(timer)
+                .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
+                .withLoader(new MockCoordinatorLoader())
+                .withEventProcessor(new DirectEventProcessor())
+                .withPartitionWriter(writer)
+                .withCoordinatorShardBuilderSupplier(new 
MockCoordinatorShardBuilderSupplier())
+                
.withCoordinatorRuntimeMetrics(mock(GroupCoordinatorRuntimeMetrics.class))
+                .withCoordinatorMetrics(mock(GroupCoordinatorMetrics.class))
+                .build();
+
+        // Loads the coordinator.
+        runtime.scheduleLoadOperation(TP, 10);
+
+        // Verify the initial state.
+        CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext 
ctx = runtime.contextOrThrow(TP);
+        assertEquals(0, ctx.coordinator.lastWrittenOffset());
+        assertEquals(0, ctx.coordinator.lastCommittedOffset());
+        assertEquals(Collections.singletonList(0L), 
ctx.coordinator.snapshotRegistry().epochsList());
+
+        // Write #1. We should get a TimeoutException because the HWM will not 
advance.
+        CompletableFuture<String> timedOutWrite = 
runtime.scheduleWriteOperation("write#1", TP, DEFAULT_WRITE_TIMEOUT,

Review Comment:
   nit: Could we actually use a different timeout here to ensure that the 
default is not used?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to