abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429333355



##########
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##########
@@ -379,6 +379,22 @@ private static MetadataResponse 
prepareMetadataResponse(Cluster cluster, Errors
             MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
     }
 
+    private static DescribeGroupsResponseData 
prepareDescribeGroupsResponseData(String groupId, List<String> groupInstances,
+                                                                               
 List<TopicPartition> topicPartitions) {
+        final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
+        byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+        List<DescribedGroupMember> describedGroupMembers = 
groupInstances.stream().map(groupInstance -> 
DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", 
"clientHost", memberAssignmentBytes, null)).collect(Collectors.toList());

Review comment:
       nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we 
don't want to test it out. Having all members use the same member.id is a bit 
weird.

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##########
@@ -51,9 +52,21 @@
             if (throwable != null) {
                 result.completeExceptionally(throwable);
             } else {
-                for (MemberToRemove memberToRemove : memberInfos) {
-                    if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-                        return;
+                if (removeAll()) {
+                    for (Map.Entry<MemberIdentity, Errors> entry: 
memberErrors.entrySet()) {
+                        Exception exception = entry.getValue().exception();
+                        if (exception != null) {
+                            Throwable ex = new KafkaException("Encounter 
exception when trying to remove: "

Review comment:
       Let's put the exception in the cause so that we could verify the cause 
in `KafkaAdminClientTest`, as:
   ```
   if (exception != null) {
     result.completeExceptionally(new KafkaException(
    "Encounter exception when trying to remove: " + entry.getKey(), exception));
     return;
   }
   ```

##########
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##########
@@ -37,7 +38,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection<MemberToRemove> members)
         this.members = new HashSet<>(members);
     }
 
+    public RemoveMembersFromConsumerGroupOptions() {
+        this.members = Collections.emptySet();;

Review comment:
       nit: extra semi-colon

##########
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##########
@@ -119,7 +122,9 @@
             + "* This tool will not clean up the local state on the stream 
application instances (the persisted "
             + "stores used to cache aggregation results).\n"
             + "You need to call KafkaStreams#cleanUp() in your application or 
manually delete them from the "
-            + "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/<application.id> by default).\n\n"
+            + "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/<application.id> by default).\n"
+            + "*Please use the \"--force\" option to force remove active 
members in case long session "

Review comment:
       nit: space after `*`. Also I feel we could make the context more 
concrete by:
   ```
   When long session timeout has been configured, active members could take 
longer to get expired on the broker thus blocking the reset job to complete. 
Use the \"--force\" option could remove those left-over members immediately. 
Make sure to stop all stream applications when this option is specified to 
avoid unexpected disruptions.
   ```

##########
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##########
@@ -261,6 +261,43 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
         Assert.assertEquals(1, exitCode);
     }
 
+    public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+        appID = testId + "-with-force-option";
+        streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+        streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);

Review comment:
       I see, this is indeed weird, please file a JIRA so that we could clean 
in a follow-up PR if others feel the same way.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to