satishd commented on a change in pull request #10579: URL: https://github.com/apache/kafka/pull/10579#discussion_r649388017
########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * <p> + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * <p> + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { + private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + + private static final long POLL_INTERVAL_MS = 30L; + + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + private final KafkaConsumer<byte[], byte[]> consumer; + private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; + private final RemoteLogMetadataTopicPartitioner topicPartitioner; + + private volatile boolean close = false; + private volatile boolean assignPartitions = false; + + private final Object assignPartitionsLock = new Object(); + + // Remote log metadata topic partitions that consumer is assigned to. + private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + + // User topic partitions that this broker is a leader/follower for. + private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + + public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, + RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + RemoteLogMetadataTopicPartitioner topicPartitioner) { + this.consumer = consumer; + this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler; + this.topicPartitioner = topicPartitioner; + } + + @Override + public void run() { + log.info("Started Consumer task thread."); + try { + while (!close) { + Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment(); + + if (!assignedMetaPartitionsSnapshot.isEmpty()) { + executeReassignment(assignedMetaPartitionsSnapshot); + } + + log.info("Polling consumer to receive remote log metadata topic records"); + ConsumerRecords<byte[], byte[]> consumerRecords + = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS)); + for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { Review comment: I do not think that check is really needed here. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * <p> + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * <p> + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { + private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + + private static final long POLL_INTERVAL_MS = 30L; + + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + private final KafkaConsumer<byte[], byte[]> consumer; + private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; + private final RemoteLogMetadataTopicPartitioner topicPartitioner; + + private volatile boolean close = false; + private volatile boolean assignPartitions = false; + + private final Object assignPartitionsLock = new Object(); + + // Remote log metadata topic partitions that consumer is assigned to. + private volatile Set<Integer> assignedMetaPartitions = Collections.emptySet(); + + // User topic partitions that this broker is a leader/follower for. + private Set<TopicIdPartition> assignedTopicPartitions = Collections.emptySet(); + + // Map of remote log metadata topic partition to consumed offsets. + private final Map<Integer, Long> partitionToConsumedOffsets = new ConcurrentHashMap<>(); + + public ConsumerTask(KafkaConsumer<byte[], byte[]> consumer, + RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler, + RemoteLogMetadataTopicPartitioner topicPartitioner) { + this.consumer = consumer; + this.remotePartitionMetadataEventHandler = remotePartitionMetadataEventHandler; + this.topicPartitioner = topicPartitioner; + } + + @Override + public void run() { + log.info("Started Consumer task thread."); + try { + while (!close) { + Set<Integer> assignedMetaPartitionsSnapshot = maybeWaitForPartitionsAssignment(); + + if (!assignedMetaPartitionsSnapshot.isEmpty()) { + executeReassignment(assignedMetaPartitionsSnapshot); + } + + log.info("Polling consumer to receive remote log metadata topic records"); + ConsumerRecords<byte[], byte[]> consumerRecords + = consumer.poll(Duration.ofSeconds(POLL_INTERVAL_MS)); + for (ConsumerRecord<byte[], byte[]> record : consumerRecords) { + handleRemoteLogMetadata(serde.deserialize(record.value())); + partitionToConsumedOffsets.put(record.partition(), record.offset()); + } + } + } catch (Exception e) { + log.error("Error occurred in consumer task, close:[{}]", close, e); + } + + closeConsumer(); Review comment: It indicates whether the closing process has been started or not. If it is set as true, consumer will stop consuming messages and it will not allow partition assignments to be updated. Updated the java doc of close. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java ########## @@ -0,0 +1,229 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; +import org.apache.kafka.clients.consumer.KafkaConsumer; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde; +import org.apache.kafka.server.log.remote.storage.RemoteLogMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.time.Duration; +import java.util.Collections; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import static org.apache.kafka.server.log.remote.metadata.storage.TopicBasedRemoteLogMetadataManagerConfig.REMOTE_LOG_METADATA_TOPIC_NAME; + +/** + * This class is responsible for consuming messages from remote log metadata topic ({@link TopicBasedRemoteLogMetadataManagerConfig#REMOTE_LOG_METADATA_TOPIC_NAME}) + * partitions and maintain the state of the remote log segment metadata. It gives an API to add or remove + * for what topic partition's metadata should be consumed by this instance using + * {{@link #addAssignmentsForPartitions(Set)}} and {@link #removeAssignmentsForPartitions(Set)} respectively. + * <p> + * When a broker is started, controller sends topic partitions that this broker is leader or follower for and the + * partitions to be deleted. This class receives those notifications with + * {@link #addAssignmentsForPartitions(Set)} and {@link #removeAssignmentsForPartitions(Set)} assigns consumer for the + * respective remote log metadata partitions by using {@link RemoteLogMetadataTopicPartitioner#metadataPartition(TopicIdPartition)}. + * Any leadership changes later are called through the same API. We will remove the partitions that are deleted from + * this broker which are received through {@link #removeAssignmentsForPartitions(Set)}. + * <p> + * After receiving these events it invokes {@link RemotePartitionMetadataEventHandler#handleRemoteLogSegmentMetadata(RemoteLogSegmentMetadata)}, + * which maintains in-memory representation of the state of {@link RemoteLogSegmentMetadata}. + */ +class ConsumerTask implements Runnable, Closeable { + private static final Logger log = LoggerFactory.getLogger(ConsumerTask.class); + + private static final long POLL_INTERVAL_MS = 30L; + + private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde(); + private final KafkaConsumer<byte[], byte[]> consumer; + private final RemotePartitionMetadataEventHandler remotePartitionMetadataEventHandler; + private final RemoteLogMetadataTopicPartitioner topicPartitioner; + + private volatile boolean close = false; + private volatile boolean assignPartitions = false; + + private final Object assignPartitionsLock = new Object(); Review comment: I wanted to have a separate lock instance specifically for the assignments and the respective processing. It gives better clarity and separations even if we add any other logic by taking lock on this instance. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataTopicPartitioner.java ########## @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.common.utils.Utils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Objects; + +public class RemoteLogMetadataTopicPartitioner { + public static final Logger log = LoggerFactory.getLogger(RemoteLogMetadataTopicPartitioner.class); + private final int noOfMetadataTopicPartitions; + + public RemoteLogMetadataTopicPartitioner(int noOfMetadataTopicPartitions) { + this.noOfMetadataTopicPartitions = noOfMetadataTopicPartitions; + } + + public int metadataPartition(TopicIdPartition topicIdPartition) { + Objects.requireNonNull(topicIdPartition, "TopicPartition can not be null"); + + int partitionNo = Utils.toPositive(Utils.murmur2(toBytes(topicIdPartition))) % noOfMetadataTopicPartitions; + log.debug("No of partitions [{}], partitionNo: [{}] for given topic: [{}]", noOfMetadataTopicPartitions, partitionNo, topicIdPartition); + return partitionNo; + } + + private byte[] toBytes(TopicIdPartition topicIdPartition) { + // We do not want to depend upon hash code generation of Uuid as that may change. + int hash = Objects.hash(topicIdPartition.topicId().getLeastSignificantBits(), + topicIdPartition.topicId().getMostSignificantBits(), + topicIdPartition.topicPartition().topic(), Review comment: Good point. We can skip topic as topic-id is sufficient here. ########## File path: storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemotePartitionMetadataStore.java ########## @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.server.log.remote.metadata.storage; + +import org.apache.kafka.common.TopicIdPartition; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata; +import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteMetadata; +import org.apache.kafka.server.log.remote.storage.RemotePartitionDeleteState; +import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException; +import org.apache.kafka.server.log.remote.storage.RemoteStorageException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.Collections; +import java.util.Iterator; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.concurrent.ConcurrentHashMap; + +public class RemotePartitionMetadataStore extends RemotePartitionMetadataEventHandler implements Closeable { + private static final Logger log = LoggerFactory.getLogger(RemotePartitionMetadataStore.class); + + private Map<TopicIdPartition, RemotePartitionDeleteMetadata> idToPartitionDeleteMetadata = Review comment: I went with assigning empty map as Map.clear() needs to go through all the entries and dereference them. Another way is to leave the map as it is and set the cose state and do not allow any operation when it is closed but it will have a check for each call. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org