ableegoldman commented on a change in pull request #11868: URL: https://github.com/apache/kafka/pull/11868#discussion_r823239035
########## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java ########## @@ -266,46 +266,57 @@ private boolean maybeCompleteFutureIfStillInCREATED(final KafkaFutureImpl<Void> private RemoveNamedTopologyResult resetOffsets(final KafkaFutureImpl<Void> removeTopologyFuture, final Set<TopicPartition> partitionsToReset) { - if (!partitionsToReset.isEmpty()) { - removeTopologyFuture.whenComplete((v, throwable) -> { - if (throwable != null) { - removeTopologyFuture.completeExceptionally(throwable); - } - DeleteConsumerGroupOffsetsResult deleteOffsetsResult = null; - while (deleteOffsetsResult == null) { - try { - deleteOffsetsResult = adminClient.deleteConsumerGroupOffsets( - applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG), partitionsToReset); - deleteOffsetsResult.all().get(); - } catch (final InterruptedException ex) { - ex.printStackTrace(); + final KafkaFutureImpl<Void> resetOffsetsFuture = new KafkaFutureImpl<>(); + try { + removeTopologyFuture.get(); Review comment: Yeah that is the main fix, however I realized that we are currently in this awkward state of psueod-async-ness and I think we might ultimately want to scratch this whole `RemoveNamedTopologyResult` and just make it fully blocking. Though I didn't want to go ahead and change the method signatures just yet, so I just have it block on the named topology future and then perform the offset reset. The actual advantage here is that before this, we were actually making the StreamThread who completed the future perform the offset reset, which of course means it gets stuck for a bit and can't continue processing until basically the whole group has dropped this named topology. Better to have the caller thread do the offset reset to let the StreamThreads keep processing the other topologies. (When we get to finally doing a KIP maybe we can discuss having a blocking and non-blocking option for these, but my feeling is let's not complicate things unnecessarily and it may be that we only really need a blocking version) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org