AndrewJSchofield commented on code in PR #18209: URL: https://github.com/apache/kafka/pull/18209#discussion_r1916356233
########## tests/kafkatest/services/verifiable_share_consumer.py: ########## @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ConsumerState: + Started = 1 + Dead = 2 + +class ShareConsumerEventHandler(object): + + def __init__(self, node, idx, state=ConsumerState.Dead): + self.node = node + self.idx = idx + self.total_consumed = 0 + self.total_acknowledged = 0 + self.total_acknowledged_failed = 0 + self.consumed_per_partition = {} + self.acknowledged_per_partition = {} + self.acknowledged_per_partition_failed = {} + self.state = state + + def handle_shutdown_complete(self, node=None, logger=None): + self.state = ConsumerState.Dead + if node is not None and logger is not None: + logger.debug("Shut down %s" % node.account.hostname) + + def handle_startup_complete(self, node, logger): + self.state = ConsumerState.Started + logger.debug("Started %s" % node.account.hostname) + + def handle_offsets_acknowledged(self, event, node, logger): + if event["success"]: + self.total_acknowledged += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + else: + self.total_acknowledged_failed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition_failed[topic_partition] = self.acknowledged_per_partition_failed.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + logger.debug("Offset acknowledgement failed for: %s" % (node.account.hostname)) + + def handle_records_consumed(self, event, node, logger): + self.total_consumed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.consumed_per_partition[topic_partition] = self.consumed_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets consumed for %s" % (node.account.hostname)) + + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the consumer + if not clean_shutdown: + self.handle_shutdown_complete() + +class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + + PERSISTENT_ROOT = "/mnt/verifiable_share_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.properties") + + logs = { + "verifiable_share_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", + version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, + on_record_consumed=None): + """ + :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + """ + super(VerifiableShareConsumer, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.offset_reset_strategy = offset_reset_strategy + self.max_messages = max_messages + self.acknowledgement_mode = acknowledgement_mode + self.prop_file = "" + self.stop_timeout_sec = stop_timeout_sec + self.on_record_consumed = on_record_consumed + + self.event_handlers = {} + self.jaas_override_variables = jaas_override_variables or {} + + self.total_records_consumed = 0 + self.total_records_acknowledged = 0 + self.total_records_acknowledged_failed = 0 + self.consumed_records_offsets = set() + # self.consumed_more_than_once = [] Review Comment: This should probably be removed. ########## tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java: ########## @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; +import net.sourceforge.argparse4j.inf.Namespace; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class VerifiableShareConsumer implements Closeable, AcknowledgementCommitCallback { + + private static final Logger log = LoggerFactory.getLogger(VerifiableShareConsumer.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final PrintStream out; + private final KafkaShareConsumer<String, String> consumer; + private final String topic; + private final AcknowledgementMode acknowledgementMode; + private final String offsetResetStrategy; + private final Boolean verbose; + private final int maxMessages; + private Integer totalAcknowledged = 0; + private final String brokerHostandPort; + private final String groupId; + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + public static class PartitionData { + private final String topic; + private final int partition; + + public PartitionData(String topic, int partition) { + this.topic = topic; + this.partition = partition; + } + + @JsonProperty + public String topic() { + return topic; + } + + @JsonProperty + public int partition() { + return partition; + } + } + + public static class RecordSetSummary extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public RecordSetSummary(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + + } + + protected static class AcknowledgedData extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public AcknowledgedData(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + } + + @JsonPropertyOrder({ "timestamp", "name" }) + private abstract static class ShareConsumerEvent { + private final long timestamp = System.currentTimeMillis(); + + @JsonProperty + public abstract String name(); + + @JsonProperty + public long timestamp() { + return timestamp; + } + } + + protected static class StartupComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "startup_complete"; + } + } + + protected static class OffsetResetStrategySet extends ShareConsumerEvent { + + private final String offsetResetStrategy; + + public OffsetResetStrategySet(String offsetResetStrategy) { + this.offsetResetStrategy = offsetResetStrategy; + } + + @Override + public String name() { + return "offset_reset_strategy_set"; + } + + @JsonProperty + public String offsetResetStrategy() { + return offsetResetStrategy; + } + } + + protected static class ShutdownComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "shutdown_complete"; + } + } + + public static class RecordsConsumed extends ShareConsumerEvent { + private final long count; + private final List<RecordSetSummary> partitionSummaries; + + public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) { + this.count = count; + this.partitionSummaries = partitionSummaries; + } + + @Override + public String name() { + return "records_consumed"; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public List<RecordSetSummary> partitions() { + return partitionSummaries; + } + } + + protected static class OffsetsAcknowledged extends ShareConsumerEvent { + + private final long count; + private final List<AcknowledgedData> partitions; + private final String error; + private final boolean success; + + public OffsetsAcknowledged(long count, List<AcknowledgedData> partitions, String error, boolean success) { Review Comment: I wonder whether a separate event might make more sense for the unsuccessful case. wdyt? Maybe an enhancement for later. ########## tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java: ########## @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; +import net.sourceforge.argparse4j.inf.Namespace; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class VerifiableShareConsumer implements Closeable, AcknowledgementCommitCallback { + + private static final Logger log = LoggerFactory.getLogger(VerifiableShareConsumer.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final PrintStream out; + private final KafkaShareConsumer<String, String> consumer; + private final String topic; + private final AcknowledgementMode acknowledgementMode; + private final String offsetResetStrategy; + private final Boolean verbose; + private final int maxMessages; + private Integer totalAcknowledged = 0; + private final String brokerHostandPort; + private final String groupId; + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + public static class PartitionData { Review Comment: `@JsonProperty` I think. ########## tests/kafkatest/services/verifiable_share_consumer.py: ########## @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ConsumerState: Review Comment: I'd go for `ShareConsumerState` I think. ########## tests/kafkatest/tests/verifiable_share_consumer_test.py: ########## @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from ducktape.utils.util import wait_until + +from kafkatest.tests.kafka_test import KafkaTest +from kafkatest.services.verifiable_producer import VerifiableProducer +from kafkatest.services.verifiable_share_consumer import VerifiableShareConsumer +from kafkatest.services.kafka import TopicPartition + +class VerifiableShareConsumerTest(KafkaTest): + PRODUCER_REQUEST_TIMEOUT_SEC = 30 + + def __init__(self, test_context, num_consumers=1, num_producers=0, **kwargs): + super(VerifiableShareConsumerTest, self).__init__(test_context, **kwargs) + self.num_consumers = num_consumers + self.num_producers = num_producers + + def _all_partitions(self, topic, num_partitions): + partitions = set() + for i in range(num_partitions): + partitions.add(TopicPartition(topic=topic, partition=i)) + return partitions + + def _partitions(self, assignment): + partitions = [] + for parts in assignment.values(): + partitions += parts + return partitions + + def valid_assignment(self, topic, num_partitions, assignment): + all_partitions = self._all_partitions(topic, num_partitions) + partitions = self._partitions(assignment) + return len(partitions) == num_partitions and set(partitions) == all_partitions + + def min_cluster_size(self): + """Override this since we're adding services outside of the constructor""" + return super(VerifiableShareConsumerTest, self).min_cluster_size() + self.num_consumers + self.num_producers + + def setup_share_group(self, topic, acknowledgement_mode="auto", group_id="test_group_id", offset_reset_strategy="", **kwargs): + return VerifiableShareConsumer(self.test_context, self.num_consumers, self.kafka, + topic, group_id, acknowledgement_mode=acknowledgement_mode, + offset_reset_strategy=offset_reset_strategy, log_level="TRACE", **kwargs) + + def setup_producer(self, topic, max_messages=-1, throughput=500): + return VerifiableProducer(self.test_context, self.num_producers, self.kafka, topic, + max_messages=max_messages, throughput=throughput, + request_timeout_sec=self.PRODUCER_REQUEST_TIMEOUT_SEC, + log_level="DEBUG") + + def await_produced_messages(self, producer, min_messages=1000, timeout_sec=10): + current_acked = producer.num_acked + wait_until(lambda: producer.num_acked >= current_acked + min_messages, timeout_sec=timeout_sec, + err_msg="Timeout awaiting messages to be produced and acked") + + def await_consumed_messages(self, consumer, min_messages=1, timeout_sec=10, total=False): + current_total = 0 + if total is False: + current_total = consumer.total_consumed() + wait_until(lambda: consumer.total_consumed() >= current_total + min_messages, + timeout_sec=timeout_sec, + err_msg="Timed out waiting for consumption") + + def await_consumed_messages_by_a_consumer(self, consumer, node, min_messages=1, timeout_sec=10, total=False): Review Comment: Can you explain the purpose of `await_consumed_messages_by_a_consumer` as opposed to `await_consumed_messages`? This is done in a different way than the regular `consumer_test.py` and `verifiable_consumer_test.py` and I wonder what the motivation is for doing it differently. ########## tests/kafkatest/services/verifiable_share_consumer.py: ########## @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ConsumerState: + Started = 1 + Dead = 2 + +class ShareConsumerEventHandler(object): + + def __init__(self, node, idx, state=ConsumerState.Dead): + self.node = node + self.idx = idx + self.total_consumed = 0 + self.total_acknowledged = 0 + self.total_acknowledged_failed = 0 + self.consumed_per_partition = {} + self.acknowledged_per_partition = {} + self.acknowledged_per_partition_failed = {} + self.state = state + + def handle_shutdown_complete(self, node=None, logger=None): + self.state = ConsumerState.Dead + if node is not None and logger is not None: + logger.debug("Shut down %s" % node.account.hostname) + + def handle_startup_complete(self, node, logger): + self.state = ConsumerState.Started + logger.debug("Started %s" % node.account.hostname) + + def handle_offsets_acknowledged(self, event, node, logger): + if event["success"]: + self.total_acknowledged += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + else: + self.total_acknowledged_failed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition_failed[topic_partition] = self.acknowledged_per_partition_failed.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + logger.debug("Offset acknowledgement failed for: %s" % (node.account.hostname)) + + def handle_records_consumed(self, event, node, logger): + self.total_consumed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.consumed_per_partition[topic_partition] = self.consumed_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets consumed for %s" % (node.account.hostname)) + + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the consumer + if not clean_shutdown: + self.handle_shutdown_complete() + +class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + + PERSISTENT_ROOT = "/mnt/verifiable_share_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.properties") + + logs = { + "verifiable_share_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", + version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, + on_record_consumed=None): + """ + :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + """ + super(VerifiableShareConsumer, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.offset_reset_strategy = offset_reset_strategy + self.max_messages = max_messages + self.acknowledgement_mode = acknowledgement_mode + self.prop_file = "" + self.stop_timeout_sec = stop_timeout_sec + self.on_record_consumed = on_record_consumed + + self.event_handlers = {} + self.jaas_override_variables = jaas_override_variables or {} + + self.total_records_consumed = 0 + self.total_records_acknowledged = 0 + self.total_records_acknowledged_failed = 0 + self.consumed_records_offsets = set() + # self.consumed_more_than_once = [] + + self.acknowledged_records_offsets = set() + # self.acknowledged_more_than_once = [] Review Comment: And this. ########## tests/kafkatest/services/verifiable_share_consumer.py: ########## @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ConsumerState: + Started = 1 + Dead = 2 + +class ShareConsumerEventHandler(object): + + def __init__(self, node, idx, state=ConsumerState.Dead): + self.node = node + self.idx = idx + self.total_consumed = 0 + self.total_acknowledged = 0 + self.total_acknowledged_failed = 0 + self.consumed_per_partition = {} + self.acknowledged_per_partition = {} + self.acknowledged_per_partition_failed = {} + self.state = state + + def handle_shutdown_complete(self, node=None, logger=None): + self.state = ConsumerState.Dead + if node is not None and logger is not None: + logger.debug("Shut down %s" % node.account.hostname) + + def handle_startup_complete(self, node, logger): + self.state = ConsumerState.Started + logger.debug("Started %s" % node.account.hostname) + + def handle_offsets_acknowledged(self, event, node, logger): + if event["success"]: + self.total_acknowledged += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + else: + self.total_acknowledged_failed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition_failed[topic_partition] = self.acknowledged_per_partition_failed.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + logger.debug("Offset acknowledgement failed for: %s" % (node.account.hostname)) + + def handle_records_consumed(self, event, node, logger): + self.total_consumed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.consumed_per_partition[topic_partition] = self.consumed_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets consumed for %s" % (node.account.hostname)) + + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the consumer + if not clean_shutdown: + self.handle_shutdown_complete() + +class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + + PERSISTENT_ROOT = "/mnt/verifiable_share_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.properties") + + logs = { + "verifiable_share_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", + version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, + on_record_consumed=None): + """ + :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + """ + super(VerifiableShareConsumer, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.offset_reset_strategy = offset_reset_strategy + self.max_messages = max_messages + self.acknowledgement_mode = acknowledgement_mode + self.prop_file = "" + self.stop_timeout_sec = stop_timeout_sec + self.on_record_consumed = on_record_consumed + + self.event_handlers = {} + self.jaas_override_variables = jaas_override_variables or {} + + self.total_records_consumed = 0 + self.total_records_acknowledged = 0 + self.total_records_acknowledged_failed = 0 + self.consumed_records_offsets = set() + # self.consumed_more_than_once = [] + + self.acknowledged_records_offsets = set() + # self.acknowledged_more_than_once = [] + self.is_offset_reset_strategy_set = False + + for node in self.nodes: + node.version = version + + def java_class_name(self): + return "VerifiableShareConsumer" + + def create_event_handler(self, idx, node): + return ShareConsumerEventHandler(node, idx) + + def _worker(self, idx, node): + with self.lock: + self.event_handlers[node] = self.create_event_handler(idx, node) + handler = self.event_handlers[node] + + node.account.ssh("mkdir -p %s" % VerifiableShareConsumer.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=VerifiableShareConsumer.LOG_FILE) Review Comment: Please investigate `get_log4j_config_for_tools` in the other related Python code. I think this way of doing things has been superseded. ########## tests/kafkatest/services/verifiable_share_consumer.py: ########## @@ -0,0 +1,337 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import os +from ducktape.services.background_thread import BackgroundThreadService + +from kafkatest.directory_layout.kafka_path import KafkaPathResolverMixin +from kafkatest.services.kafka import TopicPartition +from kafkatest.services.verifiable_client import VerifiableClientMixin +from kafkatest.version import DEV_BRANCH + +class ConsumerState: + Started = 1 + Dead = 2 + +class ShareConsumerEventHandler(object): + + def __init__(self, node, idx, state=ConsumerState.Dead): + self.node = node + self.idx = idx + self.total_consumed = 0 + self.total_acknowledged = 0 + self.total_acknowledged_failed = 0 + self.consumed_per_partition = {} + self.acknowledged_per_partition = {} + self.acknowledged_per_partition_failed = {} + self.state = state + + def handle_shutdown_complete(self, node=None, logger=None): + self.state = ConsumerState.Dead + if node is not None and logger is not None: + logger.debug("Shut down %s" % node.account.hostname) + + def handle_startup_complete(self, node, logger): + self.state = ConsumerState.Started + logger.debug("Started %s" % node.account.hostname) + + def handle_offsets_acknowledged(self, event, node, logger): + if event["success"]: + self.total_acknowledged += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition[topic_partition] = self.acknowledged_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + else: + self.total_acknowledged_failed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.acknowledged_per_partition_failed[topic_partition] = self.acknowledged_per_partition_failed.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets acknowledged for %s" % (node.account.hostname)) + logger.debug("Offset acknowledgement failed for: %s" % (node.account.hostname)) + + def handle_records_consumed(self, event, node, logger): + self.total_consumed += event["count"] + for share_partition_data in event["partitions"]: + topic_partition = TopicPartition(share_partition_data["topic"], share_partition_data["partition"]) + self.consumed_per_partition[topic_partition] = self.consumed_per_partition.get(topic_partition, 0) + share_partition_data["count"] + logger.debug("Offsets consumed for %s" % (node.account.hostname)) + + + def handle_kill_process(self, clean_shutdown): + # if the shutdown was clean, then we expect the explicit + # shutdown event from the consumer + if not clean_shutdown: + self.handle_shutdown_complete() + +class VerifiableShareConsumer(KafkaPathResolverMixin, VerifiableClientMixin, BackgroundThreadService): + """This service wraps org.apache.kafka.tools.VerifiableShareConsumer for use in + system testing. + + NOTE: this class should be treated as a PUBLIC API. Downstream users use + this service both directly and through class extension, so care must be + taken to ensure compatibility. + """ + + PERSISTENT_ROOT = "/mnt/verifiable_share_consumer" + STDOUT_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stdout") + STDERR_CAPTURE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.stderr") + LOG_DIR = os.path.join(PERSISTENT_ROOT, "logs") + LOG_FILE = os.path.join(LOG_DIR, "verifiable_share_consumer.log") + LOG4J_CONFIG = os.path.join(PERSISTENT_ROOT, "tools-log4j.properties") + CONFIG_FILE = os.path.join(PERSISTENT_ROOT, "verifiable_share_consumer.properties") + + logs = { + "verifiable_share_consumer_stdout": { + "path": STDOUT_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_stderr": { + "path": STDERR_CAPTURE, + "collect_default": False}, + "verifiable_share_consumer_log": { + "path": LOG_FILE, + "collect_default": True} + } + + def __init__(self, context, num_nodes, kafka, topic, group_id, + max_messages=-1, acknowledgement_mode="auto", offset_reset_strategy="", + version=DEV_BRANCH, stop_timeout_sec=60, log_level="INFO", jaas_override_variables=None, + on_record_consumed=None): + """ + :param jaas_override_variables: A dict of variables to be used in the jaas.conf template file + """ + super(VerifiableShareConsumer, self).__init__(context, num_nodes) + self.log_level = log_level + self.kafka = kafka + self.topic = topic + self.group_id = group_id + self.offset_reset_strategy = offset_reset_strategy + self.max_messages = max_messages + self.acknowledgement_mode = acknowledgement_mode + self.prop_file = "" + self.stop_timeout_sec = stop_timeout_sec + self.on_record_consumed = on_record_consumed + + self.event_handlers = {} + self.jaas_override_variables = jaas_override_variables or {} + + self.total_records_consumed = 0 + self.total_records_acknowledged = 0 + self.total_records_acknowledged_failed = 0 + self.consumed_records_offsets = set() + # self.consumed_more_than_once = [] + + self.acknowledged_records_offsets = set() + # self.acknowledged_more_than_once = [] + self.is_offset_reset_strategy_set = False + + for node in self.nodes: + node.version = version + + def java_class_name(self): + return "VerifiableShareConsumer" + + def create_event_handler(self, idx, node): + return ShareConsumerEventHandler(node, idx) + + def _worker(self, idx, node): + with self.lock: + self.event_handlers[node] = self.create_event_handler(idx, node) + handler = self.event_handlers[node] + + node.account.ssh("mkdir -p %s" % VerifiableShareConsumer.PERSISTENT_ROOT, allow_fail=False) + + # Create and upload log properties + log_config = self.render('tools_log4j.properties', log_file=VerifiableShareConsumer.LOG_FILE) + node.account.create_file(VerifiableShareConsumer.LOG4J_CONFIG, log_config) + + # Create and upload config file + self.security_config = self.kafka.security_config.client_config(self.prop_file, node, + self.jaas_override_variables) + self.security_config.setup_node(node) + self.prop_file += str(self.security_config) + self.logger.info("verifiable_share_consumer.properties:") + self.logger.info(self.prop_file) + node.account.create_file(VerifiableShareConsumer.CONFIG_FILE, self.prop_file) + self.security_config.setup_node(node) + + cmd = self.start_cmd(node) + self.logger.debug("VerifiableShareConsumer %d command: %s" % (idx, cmd)) + + for line in node.account.ssh_capture(cmd): + event = self.try_parse_json(node, line.strip()) + if event is not None: + with self.lock: + name = event["name"] + if name == "shutdown_complete": + handler.handle_shutdown_complete(node, self.logger) + elif name == "startup_complete": + handler.handle_startup_complete(node, self.logger) + elif name == "offsets_acknowledged": + handler.handle_offsets_acknowledged(event, node, self.logger) + self._update_global_acknowledged(event) + elif name == "records_consumed": + handler.handle_records_consumed(event, node, self.logger) + self._update_global_consumed(event) + elif name == "record_data" and self.on_record_consumed: + self.on_record_consumed(event, node) + elif name == "offset_reset_strategy_set": + self._on_offset_reset_strategy_set() + else: + self.logger.debug("%s: ignoring unknown event: %s" % (str(node.account), event)) + + def _update_global_acknowledged(self, acknowledge_event): + if acknowledge_event["success"]: + self.total_records_acknowledged += acknowledge_event["count"] + else: + self.total_records_acknowledged_failed += acknowledge_event["count"] + for share_partition_data in acknowledge_event["partitions"]: + tpkey = str(share_partition_data["topic"]) + "-" + str(share_partition_data["partition"]) + for offset in share_partition_data["offsets"]: + key = tpkey + "-" + str(offset) + if key not in self.acknowledged_records_offsets: + self.acknowledged_records_offsets.add(key) + + def _update_global_consumed(self, consumed_event): + self.total_records_consumed += consumed_event["count"] + + for share_partition_data in consumed_event["partitions"]: + tpkey = str(share_partition_data["topic"]) + "-" + str(share_partition_data["partition"]) + for offset in share_partition_data["offsets"]: + key = tpkey + "-" + str(offset) + if key not in self.consumed_records_offsets: + self.consumed_records_offsets.add(key) + + def _on_offset_reset_strategy_set(self): + self.is_offset_reset_strategy_set = True + + def start_cmd(self, node): + cmd = "" + cmd += "export LOG_DIR=%s;" % VerifiableShareConsumer.LOG_DIR + cmd += " export KAFKA_OPTS=%s;" % self.security_config.kafka_opts + cmd += " export KAFKA_LOG4J_OPTS=\"-Dlog4j.configuration=file:%s\"; " % VerifiableShareConsumer.LOG4J_CONFIG + cmd += self.impl.exec_cmd(node) + if self.on_record_consumed: + cmd += " --verbose" + + # 3.7.0 includes support for KIP-848 which introduced a new implementation of the consumer group protocol. Review Comment: These lines about 3.7.0 are not relevant in the verifiable_share_consumer. ########## tools/src/main/java/org/apache/kafka/tools/VerifiableShareConsumer.java: ########## @@ -0,0 +1,627 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.tools; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.admin.Admin; +import org.apache.kafka.clients.admin.AlterConfigOp; +import org.apache.kafka.clients.admin.AlterConfigsOptions; +import org.apache.kafka.clients.admin.ConfigEntry; +import org.apache.kafka.clients.consumer.AcknowledgementCommitCallback; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaShareConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.config.ConfigResource; +import org.apache.kafka.common.errors.WakeupException; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.coordinator.group.GroupConfig; +import org.apache.kafka.coordinator.group.ShareGroupAutoOffsetResetStrategy; + +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.JsonSerializer; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializerProvider; +import com.fasterxml.jackson.databind.module.SimpleModule; + +import net.sourceforge.argparse4j.ArgumentParsers; +import net.sourceforge.argparse4j.inf.ArgumentParser; +import net.sourceforge.argparse4j.inf.ArgumentParserException; +import net.sourceforge.argparse4j.inf.MutuallyExclusiveGroup; +import net.sourceforge.argparse4j.inf.Namespace; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.io.PrintStream; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Properties; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import static net.sourceforge.argparse4j.impl.Arguments.store; +import static net.sourceforge.argparse4j.impl.Arguments.storeTrue; + +public class VerifiableShareConsumer implements Closeable, AcknowledgementCommitCallback { + + private static final Logger log = LoggerFactory.getLogger(VerifiableShareConsumer.class); + + private final ObjectMapper mapper = new ObjectMapper(); + private final PrintStream out; + private final KafkaShareConsumer<String, String> consumer; + private final String topic; + private final AcknowledgementMode acknowledgementMode; + private final String offsetResetStrategy; + private final Boolean verbose; + private final int maxMessages; + private Integer totalAcknowledged = 0; + private final String brokerHostandPort; + private final String groupId; + private final CountDownLatch shutdownLatch = new CountDownLatch(1); + + public static class PartitionData { + private final String topic; + private final int partition; + + public PartitionData(String topic, int partition) { + this.topic = topic; + this.partition = partition; + } + + @JsonProperty + public String topic() { + return topic; + } + + @JsonProperty + public int partition() { + return partition; + } + } + + public static class RecordSetSummary extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public RecordSetSummary(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + + } + + protected static class AcknowledgedData extends PartitionData { + private final long count; + private final Set<Long> offsets; + + public AcknowledgedData(String topic, int partition, Set<Long> offsets) { + super(topic, partition); + this.offsets = offsets; + this.count = offsets.size(); + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public Set<Long> offsets() { + return offsets; + } + } + + @JsonPropertyOrder({ "timestamp", "name" }) + private abstract static class ShareConsumerEvent { + private final long timestamp = System.currentTimeMillis(); + + @JsonProperty + public abstract String name(); + + @JsonProperty + public long timestamp() { + return timestamp; + } + } + + protected static class StartupComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "startup_complete"; + } + } + + protected static class OffsetResetStrategySet extends ShareConsumerEvent { + + private final String offsetResetStrategy; + + public OffsetResetStrategySet(String offsetResetStrategy) { + this.offsetResetStrategy = offsetResetStrategy; + } + + @Override + public String name() { + return "offset_reset_strategy_set"; + } + + @JsonProperty + public String offsetResetStrategy() { + return offsetResetStrategy; + } + } + + protected static class ShutdownComplete extends ShareConsumerEvent { + + @Override + public String name() { + return "shutdown_complete"; + } + } + + public static class RecordsConsumed extends ShareConsumerEvent { + private final long count; + private final List<RecordSetSummary> partitionSummaries; + + public RecordsConsumed(long count, List<RecordSetSummary> partitionSummaries) { + this.count = count; + this.partitionSummaries = partitionSummaries; + } + + @Override + public String name() { + return "records_consumed"; + } + + @JsonProperty + public long count() { + return count; + } + + @JsonProperty + public List<RecordSetSummary> partitions() { + return partitionSummaries; + } + } + + protected static class OffsetsAcknowledged extends ShareConsumerEvent { Review Comment: Probably ought to have a `@JsonPropertyOrder` too. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org