vamossagar12 commented on code in PR #13801: URL: https://github.com/apache/kafka/pull/13801#discussion_r1560724752
########## connect/runtime/src/main/java/org/apache/kafka/connect/storage/ConnectorOffsetBackingStore.java: ########## @@ -279,15 +300,77 @@ public Future<Void> set(Map<ByteBuffer, ByteBuffer> values, Callback<Void> callb throw new IllegalStateException("At least one non-null offset store must be provided"); } - return primaryStore.set(values, (primaryWriteError, ignored) -> { + Map<ByteBuffer, ByteBuffer> regularOffsets = new HashMap<>(); + Map<ByteBuffer, ByteBuffer> tombstoneOffsets = new HashMap<>(); + values.forEach((partition, offset) -> { + if (offset == null) { + tombstoneOffsets.put(partition, null); + } else { + regularOffsets.put(partition, offset); + } + }); + + if (secondaryStore != null && !tombstoneOffsets.isEmpty()) { + AtomicReference<Throwable> primaryWriteError = new AtomicReference<>(); + FutureCallback<Void> secondaryWriteCallback = new FutureCallback<Void>() { + @Override + public void onCompletion(Throwable tombstoneWriteError, Void ignored) { + super.onCompletion(tombstoneWriteError, ignored); + if (tombstoneWriteError != null) { + log.trace("Skipping offsets write to primary store because secondary tombstone write has failed", tombstoneWriteError); + try (LoggingContext context = loggingContext()) { + callback.onCompletion(tombstoneWriteError, ignored); + } + return; + } + setPrimaryThenSecondary(primaryStore, secondaryStore, values, regularOffsets, callback, primaryWriteError); + } + + @Override + public Void get() throws InterruptedException, ExecutionException { + super.get(); + if (primaryWriteError.get() != null) { + throw new ExecutionException(primaryWriteError.get()); + } + return null; + } + + @Override + public Void get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + super.get(timeout, unit); + if (primaryWriteError.get() != null) { + if (primaryWriteError.get() instanceof TimeoutException) { + throw (TimeoutException) primaryWriteError.get(); Review Comment: I am slightly on the fence if we need to handle this case or not, because in [ConvertingFutureCallback#result](https://github.com/apache/kafka/blob/f895ab5145077c5efa10a4a898628d901b01e2c2/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConvertingFutureCallback.java#L135) I see that any exception other than `CancellationException` is wrapped in `ExecutionException`. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org