apoorvmittal10 commented on code in PR #18209:
URL: https://github.com/apache/kafka/pull/18209#discussion_r1913136314


##########
tests/kafkatest/services/verifiable_share_consumer.py:
##########
@@ -0,0 +1,337 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+from ducktape.services.background_thread import BackgroundThreadService

Review Comment:
   nit: line break



##########
tests/kafkatest/services/verifiable_share_consumer.py:
##########
@@ -0,0 +1,337 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_client import VerifiableClientMixin
+from kafkatest.version import DEV_BRANCH
+
+class ConsumerState:

Review Comment:
   nit: may be just `State` or `ShareConsumerState`



##########
tests/kafkatest/tests/client/share_consumer_test.py:
##########
@@ -0,0 +1,313 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ducktape.mark import matrix
+from ducktape.mark.resource import cluster
+from ducktape.utils.util import wait_until
+from kafkatest.tests.verifiable_share_consumer_test import 
VerifiableShareConsumerTest
+
+from kafkatest.services.kafka import TopicPartition, quorum
+
+import signal
+
+class ShareTest(VerifiableShareConsumerTest):

Review Comment:
   class ShareConsmerTest(VerifiableShareConsumerTest)?



##########
tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java:
##########
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommitCallback {
+
+    private static final Logger log = 
LoggerFactory.getLogger(VerifiableShareConsumer.class);
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final PrintStream out;
+    private final KafkaShareConsumer<String, String> consumer;
+    private final String topic;
+    private final AcknowledgementMode acknowledgementMode;
+    private final String offsetResetStrategy;
+    private final Boolean verbose;
+    private final int maxMessages;
+    private Integer totalAcknowledged = 0;
+    private final String brokerHostandPort;
+    private final String groupId;
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    public static class PartitionData {
+        private final String topic;
+        private final int partition;
+
+        public PartitionData(String topic, int partition) {
+            this.topic = topic;
+            this.partition = partition;
+        }
+
+        @JsonProperty
+        public String topic() {
+            return topic;
+        }
+
+        @JsonProperty
+        public int partition() {
+            return partition;
+        }
+    }
+
+    public static class RecordSetSummary extends PartitionData {

Review Comment:
   Why do we want to extend the classes like this? What benefit we are getting?



##########
tests/kafkatest/services/verifiable_share_consumer.py:
##########
@@ -0,0 +1,337 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_client import VerifiableClientMixin
+from kafkatest.version import DEV_BRANCH
+
+class ConsumerState:
+    Started = 1
+    Dead = 2
+
+class ShareConsumerEventHandler(object):
+
+    def __init__(self, node, idx, state=ConsumerState.Dead):
+        self.node = node
+        self.idx = idx
+        self.total_consumed = 0
+        self.total_acknowledged = 0
+        self.total_acknowledged_failed = 0
+        self.consumed_per_partition = {}
+        self.acknowledged_per_partition = {}
+        self.acknowledged_per_partition_failed = {}
+        self.state = state
+
+    def handle_shutdown_complete(self, node=None, logger=None):
+        self.state = ConsumerState.Dead
+        if node is not None and logger is not None:
+            logger.debug("Shut down %s" % node.account.hostname)
+
+    def handle_startup_complete(self, node, logger):
+        self.state = ConsumerState.Started
+        logger.debug("Started %s" % node.account.hostname)
+
+    def handle_offsets_acknowledged(self, event, node, logger):
+        if event["success"]:
+            self.total_acknowledged += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition[topic_partition] = 
self.acknowledged_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))

Review Comment:
   How do we handle data when there is partial failure i.e. request is 
successful but some topic partitions acknowledgement failed?



##########
tests/kafkatest/services/verifiable_share_consumer.py:
##########
@@ -0,0 +1,337 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+import os
+from ducktape.services.background_thread import BackgroundThreadService
+
+from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin
+from kafkatest.services.kafka import TopicPartition
+from kafkatest.services.verifiable_client import VerifiableClientMixin
+from kafkatest.version import DEV_BRANCH
+
+class ConsumerState:
+    Started = 1
+    Dead = 2
+
+class ShareConsumerEventHandler(object):
+
+    def __init__(self, node, idx, state=ConsumerState.Dead):
+        self.node = node
+        self.idx = idx
+        self.total_consumed = 0
+        self.total_acknowledged = 0
+        self.total_acknowledged_failed = 0
+        self.consumed_per_partition = {}
+        self.acknowledged_per_partition = {}
+        self.acknowledged_per_partition_failed = {}
+        self.state = state
+
+    def handle_shutdown_complete(self, node=None, logger=None):
+        self.state = ConsumerState.Dead
+        if node is not None and logger is not None:
+            logger.debug("Shut down %s" % node.account.hostname)
+
+    def handle_startup_complete(self, node, logger):
+        self.state = ConsumerState.Started
+        logger.debug("Started %s" % node.account.hostname)
+
+    def handle_offsets_acknowledged(self, event, node, logger):
+        if event["success"]:
+            self.total_acknowledged += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition[topic_partition] = 
self.acknowledged_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+        else:
+            self.total_acknowledged_failed += event["count"]
+            for share_partition_data in event["partitions"]:
+                topic_partition = 
TopicPartition(share_partition_data["topic"], share_partition_data["partition"])
+                self.acknowledged_per_partition_failed[topic_partition] = 
self.acknowledged_per_partition_failed.get(topic_partition, 0) + 
share_partition_data["count"]
+            logger.debug("Offsets acknowledged for %s" % 
(node.account.hostname))
+            logger.debug("Offset acknowledgement failed for: %s" % 
(node.account.hostname))
+
+    def handle_records_consumed(self, event, node, logger):
+        self.total_consumed += event["count"]
+        for share_partition_data in event["partitions"]:
+            topic_partition = TopicPartition(share_partition_data["topic"], 
share_partition_data["partition"])
+            self.consumed_per_partition[topic_partition] = 
self.consumed_per_partition.get(topic_partition, 0) + 
share_partition_data["count"]
+        logger.debug("Offsets consumed for %s" % (node.account.hostname))
+
+
+    def handle_kill_process(self, clean_shutdown):
+        # if the shutdown was clean, then we expect the explicit
+        # shutdown event from the consumer

Review Comment:
   nit: 
   ```suggestion
           # shutdown event from the share consumer
   ```



##########
tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java:
##########
@@ -0,0 +1,627 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.tools;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.Admin;
+import org.apache.kafka.clients.admin.AlterConfigOp;
+import org.apache.kafka.clients.admin.AlterConfigsOptions;
+import org.apache.kafka.clients.admin.ConfigEntry;
+import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaShareConsumer;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigResource;
+import org.apache.kafka.common.errors.WakeupException;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.group.GroupConfig;
+import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy;
+
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonPropertyOrder;
+import com.fasterxml.jackson.core.JsonGenerator;
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonSerializer;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializerProvider;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+import net.sourceforge.argparse4j.ArgumentParsers;
+import net.sourceforge.argparse4j.inf.ArgumentParser;
+import net.sourceforge.argparse4j.inf.ArgumentParserException;
+import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup;
+import net.sourceforge.argparse4j.inf.Namespace;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Properties;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static net.sourceforge.argparse4j.impl.Arguments.store;
+import static net.sourceforge.argparse4j.impl.Arguments.storeTrue;
+
+public class VerifiableShareConsumer implements Closeable, 
AcknowledgementCommitCallback {
+
+    private static final Logger log = 
LoggerFactory.getLogger(VerifiableShareConsumer.class);
+
+    private final ObjectMapper mapper = new ObjectMapper();
+    private final PrintStream out;
+    private final KafkaShareConsumer<String, String> consumer;
+    private final String topic;
+    private final AcknowledgementMode acknowledgementMode;
+    private final String offsetResetStrategy;
+    private final Boolean verbose;
+    private final int maxMessages;
+    private Integer totalAcknowledged = 0;
+    private final String brokerHostandPort;
+    private final String groupId;
+    private final CountDownLatch shutdownLatch = new CountDownLatch(1);
+
+    public static class PartitionData {

Review Comment:
   Can't we use TopicPartition.java class?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to