frankvicky commented on code in PR #19400:
URL: https://github.com/apache/kafka/pull/19400#discussion_r2036675899


##########
streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java:
##########
@@ -1205,45 +1200,9 @@ private Optional<String> removeStreamThread(final long 
timeoutMs) throws Timeout
                         final long cacheSizePerThread = 
cacheSizePerThread(numLiveStreamThreads());
                         log.info("Resizing thread cache due to thread removal, 
new cache size per thread is {}", cacheSizePerThread);
                         resizeThreadCache(cacheSizePerThread);
-                        if (groupInstanceID.isPresent() && 
callingThreadIsNotCurrentStreamThread) {
-                            final MemberToRemove memberToRemove = new 
MemberToRemove(groupInstanceID.get());
-                            final Collection<MemberToRemove> membersToRemove = 
Collections.singletonList(memberToRemove);
-                            final RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroupResult = 
-                                adminClient.removeMembersFromConsumerGroup(
-                                    
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG),
-                                    new 
RemoveMembersFromConsumerGroupOptions(membersToRemove)
-                                );
-                            try {
-                                final long remainingTimeMs = timeoutMs - 
(time.milliseconds() - startMs);
-                                
removeMembersFromConsumerGroupResult.memberResult(memberToRemove).get(remainingTimeMs,
 TimeUnit.MILLISECONDS);
-                            } catch (final 
java.util.concurrent.TimeoutException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to a timeout:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new 
TimeoutException(exception.getMessage(), exception);
-                            } catch (final InterruptedException e) {
-                                Thread.currentThread().interrupt();
-                            } catch (final ExecutionException exception) {
-                                log.error(
-                                    String.format(
-                                        "Could not remove static member %s 
from consumer group %s due to:",
-                                        groupInstanceID.get(),
-                                        
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                    ),
-                                        exception
-                                );
-                                throw new StreamsException(
-                                        "Could not remove static member " + 
groupInstanceID.get()
-                                            + " from consumer group " + 
applicationConfigs.getString(StreamsConfig.APPLICATION_ID_CONFIG)
-                                            + " for the following reason: ",
-                                        exception.getCause()
-                                );
-                            }

Review Comment:
   IMHO, this chunk of code could be replaced with the new close api.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java:
##########
@@ -875,7 +878,7 @@ public void run() {
             streamsUncaughtExceptionHandler.accept(e, false);
             // Note: the above call currently rethrows the exception, so 
nothing below this line will be executed
         } finally {
-            completeShutdown(cleanRun);
+            completeShutdown(cleanRun, true);

Review Comment:
   If we close the `StreamThread`,  we should ensure consumers leave their 
group.
   Does it make sense?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to