[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r429404004



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -27,10 +27,12 @@
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsOptions;
 import org.apache.kafka.clients.admin.DescribeConsumerGroupsResult;
 import org.apache.kafka.clients.admin.MemberDescription;
+import org.apache.kafka.clients.admin.RemoveMembersFromConsumerGroupOptions;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
+import org.apache.kafka.common.KafkaException;

Review comment:
   This is no longer used.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-22 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r42955



##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -379,6 +379,22 @@ private static MetadataResponse 
prepareMetadataResponse(Cluster cluster, Errors
 MetadataResponse.AUTHORIZED_OPERATIONS_OMITTED);
 }
 
+private static DescribeGroupsResponseData 
prepareDescribeGroupsResponseData(String groupId, List groupInstances,
+   
 List topicPartitions) {
+final ByteBuffer memberAssignment = 
ConsumerProtocol.serializeAssignment(new 
ConsumerPartitionAssignor.Assignment(topicPartitions));
+byte[] memberAssignmentBytes = new byte[memberAssignment.remaining()];
+List describedGroupMembers = 
groupInstances.stream().map(groupInstance -> 
DescribeGroupsResponse.groupMember("0", groupInstance, "clientId0", 
"clientHost", memberAssignmentBytes, null)).collect(Collectors.toList());

Review comment:
   nit: we could set `"0"` to `JoinGroupRequest.UNKNOWN_MEMBER_ID` if we 
don't want to test it out. Having all members use the same member.id is a bit 
weird.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupResult.java
##
@@ -51,9 +52,21 @@
 if (throwable != null) {
 result.completeExceptionally(throwable);
 } else {
-for (MemberToRemove memberToRemove : memberInfos) {
-if (maybeCompleteExceptionally(memberErrors, 
memberToRemove.toMemberIdentity(), result)) {
-return;
+if (removeAll()) {
+for (Map.Entry entry: 
memberErrors.entrySet()) {
+Exception exception = entry.getValue().exception();
+if (exception != null) {
+Throwable ex = new KafkaException("Encounter 
exception when trying to remove: "

Review comment:
   Let's put the exception in the cause so that we could verify the cause 
in `KafkaAdminClientTest`, as:
   ```
   if (exception != null) {
 result.completeExceptionally(new KafkaException(
"Encounter exception when trying to remove: " + entry.getKey(), exception));
 return;
   }
   ```

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -37,7 +38,15 @@ public 
RemoveMembersFromConsumerGroupOptions(Collection members)
 this.members = new HashSet<>(members);
 }
 
+public RemoveMembersFromConsumerGroupOptions() {
+this.members = Collections.emptySet();;

Review comment:
   nit: extra semi-colon

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -119,7 +122,9 @@
 + "* This tool will not clean up the local state on the stream 
application instances (the persisted "
 + "stores used to cache aggregation results).\n"
 + "You need to call KafkaStreams#cleanUp() in your application or 
manually delete them from the "
-+ "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/ by default).\n\n"
++ "directory specified by \"state.dir\" configuration 
(/tmp/kafka-streams/ by default).\n"
++ "*Please use the \"--force\" option to force remove active 
members in case long session "

Review comment:
   nit: space after `*`. Also I feel we could make the context more 
concrete by:
   ```
   When long session timeout has been configured, active members could take 
longer to get expired on the broker thus blocking the reset job to complete. 
Use the \"--force\" option could remove those left-over members immediately. 
Make sure to stop all stream applications when this option is specified to 
avoid unexpected disruptions.
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -261,6 +261,43 @@ public void 
shouldNotAllowToResetWhenIntermediateTopicAbsent() throws Exception
 Assert.assertEquals(1, exitCode);
 }
 
+public void testResetWhenLongSessionTimeoutConfiguredWithForceOption() 
throws Exception {
+appID = testId + "-with-force-option";
+streamsConfig.put(StreamsConfig.APPLICATION_ID_CONFIG, appID);
+streamsConfig.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "" + 
STREAMS_CONSUMER_TIMEOUT * 100);

Review comment:
   I see, this is indeed weird, please file a JIRA so that we could clean 
in a follow-up PR if others feel the same way.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For 

[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-21 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r428745517



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,26 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members;
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {

Review comment:
   I think we should catch `Exception` here:
   
https://stackoverflow.com/questions/2274102/difference-between-using-throwable-and-exception-in-a-try-catch
   

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);

Review comment:
   Let's get back the original indentation.

##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -186,9 +190,15 @@ private void validateNoActiveConsumers(final String 
groupId,
 final List members =
 new 
ArrayList<>(describeResult.describedGroups().get(groupId).get().members());
 if (!members.isEmpty()) {
-throw new IllegalStateException("Consumer group '" + groupId + "' 
is still active "
-+ "and has following members: " + members + ". "
-+ "Make sure to stop all running application instances 
before running the reset tool.");
+if (options.has(forceOption)) {
+System.out.println("Force deleting all active members in the 
group: " + groupId);
+adminClient.removeMembersFromConsumerGroup(groupId, new 
RemoveMembersFromConsumerGroupOptions()).all();

Review comment:
   Should we check the member removal result here before proceeding? If 
that call failed, the whole operation should fail with error message containing 
the result IMHO.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -507,7 +544,7 @@ private Topology 
setupTopologyWithoutIntermediateUserTopic() {
 return builder.build();
 }
 
-private void cleanGlobal(final boolean withIntermediateTopics,
+private int tryCleanGlobal(final boolean withIntermediateTopics,

Review comment:
   We could add meta comment for the return value here, and instead of 
returning an exit code, I feel a boolean is suffice to indicate whether the 
clean operation was successful or not.

##
File path: 
clients/src/test/java/org/apache/kafka/clients/admin/KafkaAdminClientTest.java
##
@@ -2411,6 +2411,50 @@ public void testRemoveMembersFromGroup() throws 
Exception {
 assertNull(noErrorResult.all().get());
 assertNull(noErrorResult.memberResult(memberOne).get());
 assertNull(noErrorResult.memberResult(memberTwo).get());
+
+// Return with success for "removeAll" scenario

Review comment:
   This test looks good, but it seems that we didn't test the case where 
some members get deleted successfully while some are not?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3640,31 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
+List members;
+if (options.removeAll()) {
+members = getMembersFromGroup(groupId);
+} else {
+members = options.members().stream().map(
+
MemberToRemove::toMemberIdentity).collect(Collectors.toList());
+}
 Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+() -> getRemoveMembersFromGroupCall(context, members));
 runnable.call(findCoordinatorCall, startFindCoordinatorMs);
 
 return new RemoveMembersFromConsumerGroupResult(future, 
options.members());
 }
 
-private Call 

[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-05 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r418108095



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3641,37 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
-Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+Call findCoordinatorCall;
+if (options.removeAll()) {
+List members = getMembersFromGroup(groupId);
+findCoordinatorCall = getFindCoordinatorCall(context,
+() -> getRemoveMembersFromGroupCall(context, members));

Review comment:
   could we pass the members into the context?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -32,12 +32,23 @@
 public class RemoveMembersFromConsumerGroupOptions extends 
AbstractOptions {
 
 private Set members;

Review comment:
   Could we just make members to be `Optional>` so that 
we don't need a separate removeAll parameter?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();
+}
+
+List memberToRemove = new ArrayList<>();
+for (MemberDescription member: members) {

Review comment:
   style error here.
   
   I would recommend doing a self style check like:
   `./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest 
spotbugsScoverage compileTestJava` otherwise we still need to fix those 
failures after we do jenkins build.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);

Review comment:
   Remove print statements

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();

Review comment:
   Curious why we are still continuing in this case, as the member lookup 
already fails.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org