ableegoldman commented on a change in pull request #11868:
URL: https://github.com/apache/kafka/pull/11868#discussion_r823239035



##########
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##########
@@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final 
KafkaFutureImpl<Void>
 
     private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> 
removeTopologyFuture,
                                                    final Set<TopicPartition> 
partitionsToReset) {
-        if (!partitionsToReset.isEmpty()) {
-            removeTopologyFuture.whenComplete((v, throwable) -> {
-                if (throwable != null) {
-                    removeTopologyFuture.completeExceptionally(throwable);
-                }
-                DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null;
-                while (deleteOffsetsResult == null) {
-                    try {
-                        deleteOffsetsResult = 
adminClient.deleteConsumerGroupOffsets(
-                            
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), 
partitionsToReset);
-                        deleteOffsetsResult.all().get();
-                    } catch (final InterruptedException ex) {
-                        ex.printStackTrace();
+        final KafkaFutureImpl<Void> resetOffsetsFuture = new 
KafkaFutureImpl<>();
+        try {
+            removeTopologyFuture.get();

Review comment:
       Yeah that is the main fix, however I realized that we are currently in 
this awkward state of psueod-async-ness and I think we might ultimately want to 
scratch this whole `RemoveNamedTopologyResult` and just make it fully blocking. 
Though I didn't want to go ahead and change the method signatures just yet, so 
I just have it block on the named topology future and then perform the offset 
reset.
   
   The actual advantage here is that before this, we were actually making the 
StreamThread who completed the future perform the offset reset, which of course 
means it gets stuck for a bit and can't continue processing until basically the 
whole group has dropped this named topology. Better to have the caller thread 
do the offset reset to let the StreamThreads keep processing the other 
topologies.
   
   (When we get to finally doing a KIP maybe we can discuss having a blocking 
and non-blocking option for these, but my feeling is let's not complicate 
things unnecessarily and it may be that we only really need a blocking version)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to