junrao commented on code in PR #17011:
URL: https://github.com/apache/kafka/pull/17011#discussion_r1739451920
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -17,58 +17,552 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.PartitionFactory;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * The share coordinator shard is a replicated state machine that manages the
metadata of all
- * share partitions. It holds the hard and the soft state of the share
partitions. This class
- * has two kinds of methods:
- * 1) The request handlers which handle the requests and generate a response
and records to
- * mutate the hard state. Those records will be written by the runtime and
applied to the
- * hard state via the replay methods.
- * 2) The replay methods which apply records to the hard state. Those are used
in the request
- * handling as well as during the initial loading of the records from the
partitions.
- */
public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord> {
+ private final Logger log;
+ private final Time time;
+ private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final ShareCoordinatorConfig config;
+ private final CoordinatorMetrics coordinatorMetrics;
+ private final CoordinatorMetricsShard metricsShard;
+ private final TimelineHashMap<SharePartitionKey, ShareGroupOffset>
shareStateMap; // coord key -> ShareGroupOffset
+ private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
+ private final TimelineHashMap<SharePartitionKey, Integer>
snapshotUpdateCount;
+ private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
+ private MetadataImage metadataImage;
+ private final int snapshotUpdateRecordsPerSnapshot;
+
+ public static final Exception NULL_TOPIC_ID = new Exception("The topic id
cannot be null.");
+ public static final Exception NEGATIVE_PARTITION_ID = new Exception("The
partition id cannot be a negative number.");
+
+ public static class Builder implements
CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
+ private ShareCoordinatorConfig config;
+ private LogContext logContext;
+ private SnapshotRegistry snapshotRegistry;
+ private Time time;
+ private CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private CoordinatorMetrics coordinatorMetrics;
+ private TopicPartition topicPartition;
+
+ public Builder(ShareCoordinatorConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ this.timer = timer;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withCoordinatorMetrics(CoordinatorMetrics
coordinatorMetrics) {
+ this.coordinatorMetrics = coordinatorMetrics;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTopicPartition(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("NPathComplexity")
+ public ShareCoordinatorShard build() {
+ if (logContext == null) logContext = new LogContext();
+ if (config == null)
+ throw new IllegalArgumentException("Config must be set.");
+ if (snapshotRegistry == null)
+ throw new IllegalArgumentException("SnapshotRegistry must be
set.");
+ if (time == null)
+ throw new IllegalArgumentException("Time must be set.");
+ if (timer == null)
+ throw new IllegalArgumentException("Timer must be set.");
+ if (coordinatorMetrics == null || !(coordinatorMetrics instanceof
ShareCoordinatorMetrics))
+ throw new IllegalArgumentException("CoordinatorMetrics must be
set and be of type ShareCoordinatorMetrics.");
+ if (topicPartition == null)
+ throw new IllegalArgumentException("TopicPartition must be
set.");
+
+ ShareCoordinatorMetricsShard metricsShard =
((ShareCoordinatorMetrics) coordinatorMetrics)
+ .newMetricsShard(snapshotRegistry, topicPartition);
+
+ return new ShareCoordinatorShard(
+ logContext,
+ time,
+ timer,
+ config,
+ coordinatorMetrics,
+ metricsShard,
+ snapshotRegistry
+ );
+ }
+ }
+
+ ShareCoordinatorShard(
+ LogContext logContext,
+ Time time,
+ CoordinatorTimer<Void, CoordinatorRecord> timer,
+ ShareCoordinatorConfig config,
+ CoordinatorMetrics coordinatorMetrics,
+ CoordinatorMetricsShard metricsShard,
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.log = logContext.logger(ShareCoordinatorShard.class);
+ this.time = time;
+ this.timer = timer;
+ this.config = config;
+ this.coordinatorMetrics = coordinatorMetrics;
+ this.metricsShard = metricsShard;
+ this.shareStateMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateRecordsPerSnapshot =
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
+ }
+
@Override
public void onLoaded(MetadataImage newImage) {
- CoordinatorShard.super.onLoaded(newImage);
+ coordinatorMetrics.activateMetricsShard(metricsShard);
}
@Override
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
- CoordinatorShard.super.onNewMetadataImage(newImage, delta);
+ this.metadataImage = newImage;
}
@Override
public void onUnloaded() {
- CoordinatorShard.super.onUnloaded();
+ coordinatorMetrics.deactivateMetricsShard(metricsShard);
}
@Override
public void replay(long offset, long producerId, short producerEpoch,
CoordinatorRecord record) throws RuntimeException {
+ ApiMessageAndVersion key = record.key();
+ ApiMessageAndVersion value = record.value();
+
+ switch (key.version()) {
+ case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: //
ShareSnapshot
+ handleShareSnapshot((ShareSnapshotKey) key.message(),
(ShareSnapshotValue) messageOrNull(value));
+ break;
+ case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: //
ShareUpdate
+ handleShareUpdate((ShareUpdateKey) key.message(),
(ShareUpdateValue) messageOrNull(value));
+ break;
+ default:
+ // noop
+ }
+ }
+
+ private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value) {
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+ maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this record is the complete snapshot
+ shareStateMap.put(mapKey, offsetRecord);
+ // if number of share updates is exceeded, then reset it
+ if (snapshotUpdateCount.containsKey(mapKey)) {
+ if (snapshotUpdateCount.get(mapKey) >=
snapshotUpdateRecordsPerSnapshot) {
+ snapshotUpdateCount.put(mapKey, 0);
+ }
+ }
+ }
+
+ private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value)
{
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+
+ // share update does not hold state epoch information.
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this is an incremental snapshot
+ // so, we need to apply it to our current soft state
+ shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord :
merge(v, value));
+ snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+ }
+
+ private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int
leaderEpoch) {
+ leaderEpochMap.putIfAbsent(mapKey, leaderEpoch);
+ if (leaderEpochMap.get(mapKey) < leaderEpoch) {
+ leaderEpochMap.put(mapKey, leaderEpoch);
+ }
+ }
+
+ private void maybeUpdateStateEpochMap(SharePartitionKey mapKey, int
stateEpoch) {
+ stateEpochMap.putIfAbsent(mapKey, stateEpoch);
+ if (stateEpochMap.get(mapKey) < stateEpoch) {
+ stateEpochMap.put(mapKey, stateEpoch);
+ }
}
@Override
public void replayEndTransactionMarker(long producerId, short
producerEpoch, TransactionResult result) throws RuntimeException {
CoordinatorShard.super.replayEndTransactionMarker(producerId,
producerEpoch, result);
}
- public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(RequestContext context,
WriteShareGroupStateRequestData request) {
- throw new RuntimeException("Not implemented");
+ /**
+ * This method generates the ShareSnapshotValue record corresponding to
the requested topic partition information.
+ * The generated record is then written to the __share_group_state topic
and replayed to the in-memory state
+ * of the coordinator shard, shareStateMap, by CoordinatorRuntime.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only a single key i.e.
group1:topic1:partition1. The implementation
+ * below was done keeping this in mind.
+ *
+ * @param context - RequestContext
+ * @param request - WriteShareGroupStateRequestData for a single key
+ * @return CoordinatorResult(records, response)
+ */
+ @SuppressWarnings("NPathComplexity")
+ public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(
+ RequestContext context,
Review Comment:
context seems unused?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -17,58 +17,552 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.PartitionFactory;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * The share coordinator shard is a replicated state machine that manages the
metadata of all
- * share partitions. It holds the hard and the soft state of the share
partitions. This class
- * has two kinds of methods:
- * 1) The request handlers which handle the requests and generate a response
and records to
- * mutate the hard state. Those records will be written by the runtime and
applied to the
- * hard state via the replay methods.
- * 2) The replay methods which apply records to the hard state. Those are used
in the request
- * handling as well as during the initial loading of the records from the
partitions.
- */
public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord> {
+ private final Logger log;
+ private final Time time;
+ private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final ShareCoordinatorConfig config;
Review Comment:
time, timer and config seem unused?
##########
core/src/main/scala/kafka/server/KafkaApis.scala:
##########
@@ -1689,8 +1694,8 @@ class KafkaApis(val requestChannel: RequestChannel,
(txnCoordinator.partitionFor(key), TRANSACTION_STATE_TOPIC_NAME)
case CoordinatorType.SHARE =>
- // When share coordinator support is implemented in KIP-932, a
proper check will go here
- return (Errors.COORDINATOR_NOT_AVAILABLE, Node.noNode)
+ // None check already done above
Review Comment:
Hmm, what check is the comment referring to?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -17,58 +17,552 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.PartitionFactory;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * The share coordinator shard is a replicated state machine that manages the
metadata of all
- * share partitions. It holds the hard and the soft state of the share
partitions. This class
- * has two kinds of methods:
- * 1) The request handlers which handle the requests and generate a response
and records to
- * mutate the hard state. Those records will be written by the runtime and
applied to the
- * hard state via the replay methods.
- * 2) The replay methods which apply records to the hard state. Those are used
in the request
- * handling as well as during the initial loading of the records from the
partitions.
- */
public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord> {
+ private final Logger log;
+ private final Time time;
+ private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final ShareCoordinatorConfig config;
+ private final CoordinatorMetrics coordinatorMetrics;
+ private final CoordinatorMetricsShard metricsShard;
+ private final TimelineHashMap<SharePartitionKey, ShareGroupOffset>
shareStateMap; // coord key -> ShareGroupOffset
+ private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
+ private final TimelineHashMap<SharePartitionKey, Integer>
snapshotUpdateCount;
+ private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
+ private MetadataImage metadataImage;
+ private final int snapshotUpdateRecordsPerSnapshot;
+
+ public static final Exception NULL_TOPIC_ID = new Exception("The topic id
cannot be null.");
+ public static final Exception NEGATIVE_PARTITION_ID = new Exception("The
partition id cannot be a negative number.");
+
+ public static class Builder implements
CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
+ private ShareCoordinatorConfig config;
+ private LogContext logContext;
+ private SnapshotRegistry snapshotRegistry;
+ private Time time;
+ private CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private CoordinatorMetrics coordinatorMetrics;
+ private TopicPartition topicPartition;
+
+ public Builder(ShareCoordinatorConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ this.timer = timer;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withCoordinatorMetrics(CoordinatorMetrics
coordinatorMetrics) {
+ this.coordinatorMetrics = coordinatorMetrics;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTopicPartition(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("NPathComplexity")
+ public ShareCoordinatorShard build() {
+ if (logContext == null) logContext = new LogContext();
+ if (config == null)
+ throw new IllegalArgumentException("Config must be set.");
+ if (snapshotRegistry == null)
+ throw new IllegalArgumentException("SnapshotRegistry must be
set.");
+ if (time == null)
+ throw new IllegalArgumentException("Time must be set.");
+ if (timer == null)
+ throw new IllegalArgumentException("Timer must be set.");
+ if (coordinatorMetrics == null || !(coordinatorMetrics instanceof
ShareCoordinatorMetrics))
+ throw new IllegalArgumentException("CoordinatorMetrics must be
set and be of type ShareCoordinatorMetrics.");
+ if (topicPartition == null)
+ throw new IllegalArgumentException("TopicPartition must be
set.");
+
+ ShareCoordinatorMetricsShard metricsShard =
((ShareCoordinatorMetrics) coordinatorMetrics)
+ .newMetricsShard(snapshotRegistry, topicPartition);
+
+ return new ShareCoordinatorShard(
+ logContext,
+ time,
+ timer,
+ config,
+ coordinatorMetrics,
+ metricsShard,
+ snapshotRegistry
+ );
+ }
+ }
+
+ ShareCoordinatorShard(
+ LogContext logContext,
+ Time time,
+ CoordinatorTimer<Void, CoordinatorRecord> timer,
+ ShareCoordinatorConfig config,
+ CoordinatorMetrics coordinatorMetrics,
+ CoordinatorMetricsShard metricsShard,
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.log = logContext.logger(ShareCoordinatorShard.class);
+ this.time = time;
+ this.timer = timer;
+ this.config = config;
+ this.coordinatorMetrics = coordinatorMetrics;
+ this.metricsShard = metricsShard;
+ this.shareStateMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateRecordsPerSnapshot =
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
+ }
+
@Override
public void onLoaded(MetadataImage newImage) {
- CoordinatorShard.super.onLoaded(newImage);
+ coordinatorMetrics.activateMetricsShard(metricsShard);
}
@Override
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
- CoordinatorShard.super.onNewMetadataImage(newImage, delta);
+ this.metadataImage = newImage;
}
@Override
public void onUnloaded() {
- CoordinatorShard.super.onUnloaded();
+ coordinatorMetrics.deactivateMetricsShard(metricsShard);
}
@Override
public void replay(long offset, long producerId, short producerEpoch,
CoordinatorRecord record) throws RuntimeException {
+ ApiMessageAndVersion key = record.key();
+ ApiMessageAndVersion value = record.value();
+
+ switch (key.version()) {
+ case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: //
ShareSnapshot
+ handleShareSnapshot((ShareSnapshotKey) key.message(),
(ShareSnapshotValue) messageOrNull(value));
+ break;
+ case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: //
ShareUpdate
+ handleShareUpdate((ShareUpdateKey) key.message(),
(ShareUpdateValue) messageOrNull(value));
+ break;
+ default:
+ // noop
+ }
+ }
+
+ private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value) {
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+ maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this record is the complete snapshot
+ shareStateMap.put(mapKey, offsetRecord);
+ // if number of share updates is exceeded, then reset it
+ if (snapshotUpdateCount.containsKey(mapKey)) {
+ if (snapshotUpdateCount.get(mapKey) >=
snapshotUpdateRecordsPerSnapshot) {
+ snapshotUpdateCount.put(mapKey, 0);
+ }
+ }
+ }
+
+ private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value)
{
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+
+ // share update does not hold state epoch information.
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this is an incremental snapshot
+ // so, we need to apply it to our current soft state
+ shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord :
merge(v, value));
+ snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+ }
+
+ private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int
leaderEpoch) {
+ leaderEpochMap.putIfAbsent(mapKey, leaderEpoch);
+ if (leaderEpochMap.get(mapKey) < leaderEpoch) {
+ leaderEpochMap.put(mapKey, leaderEpoch);
+ }
+ }
+
+ private void maybeUpdateStateEpochMap(SharePartitionKey mapKey, int
stateEpoch) {
+ stateEpochMap.putIfAbsent(mapKey, stateEpoch);
+ if (stateEpochMap.get(mapKey) < stateEpoch) {
+ stateEpochMap.put(mapKey, stateEpoch);
+ }
}
@Override
public void replayEndTransactionMarker(long producerId, short
producerEpoch, TransactionResult result) throws RuntimeException {
CoordinatorShard.super.replayEndTransactionMarker(producerId,
producerEpoch, result);
}
- public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(RequestContext context,
WriteShareGroupStateRequestData request) {
- throw new RuntimeException("Not implemented");
+ /**
+ * This method generates the ShareSnapshotValue record corresponding to
the requested topic partition information.
+ * The generated record is then written to the __share_group_state topic
and replayed to the in-memory state
+ * of the coordinator shard, shareStateMap, by CoordinatorRuntime.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only a single key i.e.
group1:topic1:partition1. The implementation
+ * below was done keeping this in mind.
+ *
+ * @param context - RequestContext
+ * @param request - WriteShareGroupStateRequestData for a single key
+ * @return CoordinatorResult(records, response)
+ */
+ @SuppressWarnings("NPathComplexity")
+ public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(
+ RequestContext context,
+ WriteShareGroupStateRequestData request
+ ) {
+ // records to write (with both key and value of snapshot type),
response to caller
+ // only one key will be there in the request by design
+
+
metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+ Optional<CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord>> error = maybeGetWriteStateError(request);
+ if (error.isPresent()) {
+ return error.get();
+ }
+
+ String groupId = request.groupId();
+ WriteShareGroupStateRequestData.WriteStateData topicData =
request.topics().get(0);
+ WriteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+
+ SharePartitionKey key = SharePartitionKey.getInstance(groupId,
topicData.topicId(), partitionData.partition());
+ List<CoordinatorRecord> recordList;
+
+ if (!shareStateMap.containsKey(key)) {
+ // since this is the first time we are getting a write request, we
should be creating a share snapshot record
+ recordList =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ groupId, topicData.topicId(), partitionData.partition(),
ShareGroupOffset.fromRequest(partitionData)
+ ));
+ } else if (snapshotUpdateCount.getOrDefault(key, 0) >=
snapshotUpdateRecordsPerSnapshot) {
+ // Since the number of update records for this share part key
exceeds snapshotUpdateRecordsPerSnapshot,
+ // we should be creating a share snapshot record.
+ List<PersisterOffsetsStateBatch> batchesToAdd;
+ if (partitionData.startOffset() == -1) {
+ batchesToAdd = combineStateBatches(
+ shareStateMap.get(key).stateBatchAsSet(),
+ partitionData.stateBatches().stream()
+ .map(PersisterOffsetsStateBatch::from)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
+ } else {
+ // start offset is being updated - we should only
+ // consider new updates to batches
+ batchesToAdd = partitionData.stateBatches().stream()
+
.map(PersisterOffsetsStateBatch::from).collect(Collectors.toList());
+ }
+
+ int newLeaderEpoch = partitionData.leaderEpoch() == -1 ?
shareStateMap.get(key).leaderEpoch() : partitionData.leaderEpoch();
+ int newStateEpoch = partitionData.stateEpoch() == -1 ?
shareStateMap.get(key).stateEpoch() : partitionData.stateEpoch();
+ long newStartOffset = partitionData.startOffset() == -1 ?
shareStateMap.get(key).startOffset() : partitionData.startOffset();
+
+ recordList =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ groupId, topicData.topicId(), partitionData.partition(),
+ new ShareGroupOffset.Builder()
+ .setStartOffset(newStartOffset)
+ .setLeaderEpoch(newLeaderEpoch)
+ .setStateEpoch(newStateEpoch)
+ .setStateBatches(batchesToAdd)
+ .build()));
+ } else {
+ // share snapshot is present and number of share snapshot update
records < snapshotUpdateRecordsPerSnapshot
+ recordList =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotUpdateRecord(
+ groupId, topicData.topicId(), partitionData.partition(),
ShareGroupOffset.fromRequest(partitionData,
shareStateMap.get(key).snapshotEpoch())
+ ));
+ }
+
+ List<CoordinatorRecord> validRecords = new LinkedList<>();
+
+ WriteShareGroupStateResponseData responseData = new
WriteShareGroupStateResponseData();
+ for (CoordinatorRecord record : recordList) { // should be single
record
+ if (!(record.key().message() instanceof ShareSnapshotKey) &&
!(record.key().message() instanceof ShareUpdateKey)) {
+ continue;
+ }
+ SharePartitionKey mapKey = null;
+ boolean shouldIncSnapshotEpoch = false;
+ if (record.key().message() instanceof ShareSnapshotKey) {
+ ShareSnapshotKey recordKey = (ShareSnapshotKey)
record.key().message();
+
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
+ recordKey.topicId(),
Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
+ recordKey.partition())))));
+ mapKey = SharePartitionKey.getInstance(recordKey.groupId(),
recordKey.topicId(), recordKey.partition());
+ shouldIncSnapshotEpoch = true;
+ } else if (record.key().message() instanceof ShareUpdateKey) {
+ ShareUpdateKey recordKey = (ShareUpdateKey)
record.key().message();
+
responseData.setResults(Collections.singletonList(WriteShareGroupStateResponse.toResponseWriteStateResult(
+ recordKey.topicId(),
Collections.singletonList(WriteShareGroupStateResponse.toResponsePartitionResult(
+ recordKey.partition())))));
+ mapKey = SharePartitionKey.getInstance(recordKey.groupId(),
recordKey.topicId(), recordKey.partition());
+ }
+
+ if (shareStateMap.containsKey(mapKey) && shouldIncSnapshotEpoch) {
+ ShareGroupOffset oldValue = shareStateMap.get(mapKey);
+ ((ShareSnapshotValue)
record.value().message()).setSnapshotEpoch(oldValue.snapshotEpoch() + 1); //
increment the snapshot epoch
+ }
+ validRecords.add(record); // this will have updated snapshot epoch
and on replay the value will trickle down to the map
+ }
+
+ return new CoordinatorResult<>(validRecords, responseData);
}
+ /**
+ * This method finds the ShareSnapshotValue record corresponding to the
requested topic partition from the
+ * in-memory state of coordinator shard, the shareStateMap.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only key i.e. group1:topic1:partition1.
The implementation
+ * below was done keeping this in mind.
+ *
+ * @param request - WriteShareGroupStateRequestData for a single key
+ * @param offset - offset to read from the __share_group_state topic
partition
+ * @return CoordinatorResult(records, response)
+ */
public ReadShareGroupStateResponseData
readState(ReadShareGroupStateRequestData request, Long offset) {
- throw new RuntimeException("Not implemented");
+ // records to read (with the key of snapshot type), response to caller
+ // only one key will be there in the request by design
+ Optional<ReadShareGroupStateResponseData> error =
maybeGetReadStateError(request, offset);
+ if (error.isPresent()) {
+ return error.get();
+ }
+
+ Uuid topicId = request.topics().get(0).topicId();
+ int partition =
request.topics().get(0).partitions().get(0).partition();
+ int leaderEpoch =
request.topics().get(0).partitions().get(0).leaderEpoch();
+
+ SharePartitionKey coordinatorKey =
SharePartitionKey.getInstance(request.groupId(), topicId, partition);
+
+ if (!shareStateMap.containsKey(coordinatorKey)) {
+ return ReadShareGroupStateResponse.toResponseData(
+ topicId,
+ partition,
+ PartitionFactory.DEFAULT_START_OFFSET,
+ PartitionFactory.DEFAULT_STATE_EPOCH,
+ Collections.emptyList()
+ );
+ }
+
+ ShareGroupOffset offsetValue = shareStateMap.get(coordinatorKey,
offset);
+
+ if (offsetValue == null) {
+ // Returning an error response as the snapshot value was not found
+ return ReadShareGroupStateResponse.toErrorResponseData(
+ topicId,
+ partition,
+ Errors.UNKNOWN_SERVER_ERROR,
+ "Data not found for topic {}, partition {} for group {}, in
the in-memory state of share coordinator"
+ );
+ }
+
+ List<ReadShareGroupStateResponseData.StateBatch> stateBatches =
(offsetValue.stateBatches() != null && !offsetValue.stateBatches().isEmpty()) ?
+ offsetValue.stateBatches().stream().map(
+ stateBatch -> new ReadShareGroupStateResponseData.StateBatch()
+ .setFirstOffset(stateBatch.firstOffset())
+ .setLastOffset(stateBatch.lastOffset())
+ .setDeliveryState(stateBatch.deliveryState())
+ .setDeliveryCount(stateBatch.deliveryCount())
+ ).collect(java.util.stream.Collectors.toList()) :
Collections.emptyList();
+
+ // Updating the leader map with the new leader epoch
+ leaderEpochMap.put(coordinatorKey, leaderEpoch);
+
+ // Returning the successfully retrieved snapshot value
+ return ReadShareGroupStateResponse.toResponseData(topicId, partition,
offsetValue.startOffset(), offsetValue.stateEpoch(), stateBatches);
+ }
+
+ private Optional<CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord>> maybeGetWriteStateError(
+ WriteShareGroupStateRequestData request
+ ) {
+ String groupId = request.groupId();
+ WriteShareGroupStateRequestData.WriteStateData topicData =
request.topics().get(0);
+ WriteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+
+ Uuid topicId = topicData.topicId();
+ int partitionId = partitionData.partition();
+
+ if (topicId == null) {
+ return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST,
NULL_TOPIC_ID, null, partitionId));
+ }
+
+ if (partitionId < 0) {
+ return Optional.of(getWriteErrorResponse(Errors.INVALID_REQUEST,
NEGATIVE_PARTITION_ID, topicId, partitionId));
+ }
+
+ SharePartitionKey mapKey = SharePartitionKey.getInstance(groupId,
topicId, partitionId);
+ if (leaderEpochMap.containsKey(mapKey) && leaderEpochMap.get(mapKey) >
partitionData.leaderEpoch()) {
+ return
Optional.of(getWriteErrorResponse(Errors.FENCED_LEADER_EPOCH, null, topicId,
partitionId));
+ }
+ if (stateEpochMap.containsKey(mapKey) && stateEpochMap.get(mapKey) >
partitionData.stateEpoch()) {
+ return
Optional.of(getWriteErrorResponse(Errors.FENCED_STATE_EPOCH, null, topicId,
partitionId));
+ }
+ if (metadataImage != null && (metadataImage.topics().getTopic(topicId)
== null ||
Review Comment:
If `metadataImage` is null, we should return `UNKNOWN_TOPIC_OR_PARTITION`
too, right?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorService.java:
##########
@@ -0,0 +1,535 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.coordinator.share;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
+import org.apache.kafka.common.compress.Compression;
+import org.apache.kafka.common.config.TopicConfig;
+import org.apache.kafka.common.internals.Topic;
+import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
+import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
+import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
+import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
+import org.apache.kafka.common.requests.RequestContext;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorEventProcessor;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntime;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorRuntimeMetrics;
+import
org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilderSupplier;
+import org.apache.kafka.coordinator.common.runtime.MultiThreadedEventProcessor;
+import org.apache.kafka.coordinator.common.runtime.PartitionWriter;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.server.record.BrokerCompressionType;
+import org.apache.kafka.server.util.timer.Timer;
+
+import org.slf4j.Logger;
+
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.OptionalInt;
+import java.util.Properties;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.IntSupplier;
+import java.util.stream.Collectors;
+
+import static
org.apache.kafka.coordinator.common.runtime.CoordinatorOperationExceptionHelper.handleOperationException;
+
+public class ShareCoordinatorService implements ShareCoordinator {
+ private final ShareCoordinatorConfig config;
+ private final Logger log;
+ private final AtomicBoolean isActive = new AtomicBoolean(false); // for
controlling start and stop
+ private final CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
runtime;
+ private final ShareCoordinatorMetrics shareCoordinatorMetrics;
+ private volatile int numPartitions = -1; // Number of partitions for
__share_group_state. Provided when component is started.
+ private final Time time;
+
+ public static class Builder {
+ private final int nodeId;
+ private final ShareCoordinatorConfig config;
+ private PartitionWriter writer;
+ private CoordinatorLoader<CoordinatorRecord> loader;
+ private Time time;
+ private Timer timer;
+
+ private ShareCoordinatorMetrics coordinatorMetrics;
+ private CoordinatorRuntimeMetrics coordinatorRuntimeMetrics;
+
+ public Builder(int nodeId, ShareCoordinatorConfig config) {
+ this.nodeId = nodeId;
+ this.config = config;
+ }
+
+ public Builder withWriter(PartitionWriter writer) {
+ this.writer = writer;
+ return this;
+ }
+
+ public Builder withLoader(CoordinatorLoader<CoordinatorRecord> loader)
{
+ this.loader = loader;
+ return this;
+ }
+
+ public Builder withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ public Builder withTimer(Timer timer) {
+ this.timer = timer;
+ return this;
+ }
+
+ public Builder withCoordinatorMetrics(ShareCoordinatorMetrics
coordinatorMetrics) {
+ this.coordinatorMetrics = coordinatorMetrics;
+ return this;
+ }
+
+ public Builder withCoordinatorRuntimeMetrics(CoordinatorRuntimeMetrics
coordinatorRuntimeMetrics) {
+ this.coordinatorRuntimeMetrics = coordinatorRuntimeMetrics;
+ return this;
+ }
+
+ public ShareCoordinatorService build() {
+ if (config == null) {
+ throw new IllegalArgumentException("Config must be set.");
+ }
+ if (writer == null) {
+ throw new IllegalArgumentException("Writer must be set.");
+ }
+ if (loader == null) {
+ throw new IllegalArgumentException("Loader must be set.");
+ }
+ if (time == null) {
+ throw new IllegalArgumentException("Time must be set.");
+ }
+ if (timer == null) {
+ throw new IllegalArgumentException("Timer must be set.");
+ }
+ if (coordinatorMetrics == null) {
+ throw new IllegalArgumentException("Share Coordinator metrics
must be set.");
+ }
+ if (coordinatorRuntimeMetrics == null) {
+ throw new IllegalArgumentException("Coordinator runtime
metrics must be set.");
+ }
+
+ String logPrefix = String.format("ShareCoordinator id=%d", nodeId);
+ LogContext logContext = new LogContext(String.format("[%s] ",
logPrefix));
+
+ CoordinatorShardBuilderSupplier<ShareCoordinatorShard,
CoordinatorRecord> supplier = () ->
+ new ShareCoordinatorShard.Builder(config);
+
+ CoordinatorEventProcessor processor = new
MultiThreadedEventProcessor(
+ logContext,
+ "share-coordinator-event-processor-",
+ config.shareCoordinatorNumThreads(),
+ time,
+ coordinatorRuntimeMetrics
+ );
+
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord>
runtime =
+ new CoordinatorRuntime.Builder<ShareCoordinatorShard,
CoordinatorRecord>()
+ .withTime(time)
+ .withTimer(timer)
+ .withLogPrefix(logPrefix)
+ .withLogContext(logContext)
+ .withEventProcessor(processor)
+ .withPartitionWriter(writer)
+ .withLoader(loader)
+ .withCoordinatorShardBuilderSupplier(supplier)
+ .withTime(time)
+
.withDefaultWriteTimeOut(Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()))
+ .withCoordinatorRuntimeMetrics(coordinatorRuntimeMetrics)
+ .withCoordinatorMetrics(coordinatorMetrics)
+ .withSerializer(new ShareCoordinatorRecordSerde())
+
.withCompression(Compression.of(config.shareCoordinatorStateTopicCompressionType()).build())
+
.withAppendLingerMs(config.shareCoordinatorAppendLingerMs())
+ .build();
+
+ return new ShareCoordinatorService(
+ logContext,
+ config,
+ runtime,
+ coordinatorMetrics,
+ time
+ );
+ }
+ }
+
+ public ShareCoordinatorService(
+ LogContext logContext,
+ ShareCoordinatorConfig config,
+ CoordinatorRuntime<ShareCoordinatorShard, CoordinatorRecord> runtime,
+ ShareCoordinatorMetrics shareCoordinatorMetrics,
+ Time time) {
+ this.log = logContext.logger(ShareCoordinatorService.class);
+ this.config = config;
+ this.runtime = runtime;
+ this.shareCoordinatorMetrics = shareCoordinatorMetrics;
+ this.time = time;
+ }
+
+ @Override
+ public int partitionFor(String key) {
+ return Utils.abs(key.hashCode()) % numPartitions;
+ }
+
+ @Override
+ public Properties shareGroupStateTopicConfigs() {
+ Properties properties = new Properties();
+ properties.put(TopicConfig.CLEANUP_POLICY_CONFIG,
TopicConfig.CLEANUP_POLICY_DELETE); // as defined in KIP-932
+ properties.put(TopicConfig.COMPRESSION_TYPE_CONFIG,
BrokerCompressionType.PRODUCER.name);
+ properties.put(TopicConfig.SEGMENT_BYTES_CONFIG,
config.shareCoordinatorStateTopicSegmentBytes());
+ return properties;
+ }
+
+ /**
+ * The share coordinator startup method will get invoked from
BrokerMetadataPublisher.
+ * At the time of writing, the publisher uses metadata cache to fetch the
number of partitions
+ * of the share state topic. In case this information is not available,
the user provided
+ * config will be used to fetch the value.
+ * This is consistent with the group coordinator startup functionality.
+ *
+ * @param shareGroupTopicPartitionCount - supplier returning the number of
partitions for __share_group_state topic
+ */
+ @Override
+ public void startup(
+ IntSupplier shareGroupTopicPartitionCount
+ ) {
+ if (!isActive.compareAndSet(false, true)) {
+ log.warn("Share coordinator is already running.");
+ return;
+ }
+
+ log.info("Starting up.");
+ numPartitions = shareGroupTopicPartitionCount.getAsInt();
+ log.info("Startup complete.");
+ }
+
+ @Override
+ public void shutdown() {
+ if (!isActive.compareAndSet(true, false)) {
+ log.warn("Share coordinator is already shutting down.");
+ return;
+ }
+
+ log.info("Shutting down.");
+ Utils.closeQuietly(runtime, "coordinator runtime");
+ Utils.closeQuietly(shareCoordinatorMetrics, "share coordinator
metrics");
+ log.info("Shutdown complete.");
+ }
+
+ @Override
+ public CompletableFuture<WriteShareGroupStateResponseData>
writeState(RequestContext context, WriteShareGroupStateRequestData request) {
+ // Send an empty response if topic data is empty
+ if (isEmpty(request.topics())) {
+ log.error("Topic Data is empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new WriteShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if partition data is empty for any topic
+ for (WriteShareGroupStateRequestData.WriteStateData topicData :
request.topics()) {
+ if (isEmpty(topicData.partitions())) {
+ log.error("Partition Data for topic {} is empty: {}",
topicData.topicId(), request);
+ return CompletableFuture.completedFuture(
+ new WriteShareGroupStateResponseData()
+ );
+ }
+ }
+
+ String groupId = request.groupId();
+ // Send an empty response if groupId is invalid
+ if (isGroupIdEmpty(groupId)) {
+ log.error("Group id must be specified and non-empty: {}", request);
+ return CompletableFuture.completedFuture(
+ new WriteShareGroupStateResponseData()
+ );
+ }
+
+ // Send an empty response if the coordinator is not active
+ if (!isActive.get()) {
+ return CompletableFuture.completedFuture(
+ generateErrorWriteStateResponse(
+ request,
+ Errors.COORDINATOR_NOT_AVAILABLE,
+ "Share coordinator is not available."
+ )
+ );
+ }
+
+ // The request received here could have multiple keys of structure
group:topic:partition. However,
+ // the writeState method in ShareCoordinatorShard expects a single key
in the request. Hence, we will
+ // be looping over the keys below and constructing new
WriteShareGroupStateRequestData objects to pass
+ // onto the shard method.
+ Map<Uuid, Map<Integer,
CompletableFuture<WriteShareGroupStateResponseData>>> futureMap = new
HashMap<>();
+ long startTime = time.hiResClockMs();
+
+ request.topics().forEach(topicData -> {
+ Map<Integer, CompletableFuture<WriteShareGroupStateResponseData>>
partitionFut =
+ futureMap.computeIfAbsent(topicData.topicId(), k -> new
HashMap<>());
+ topicData.partitions().forEach(
+ partitionData -> {
+ CompletableFuture<WriteShareGroupStateResponseData> future
= runtime.scheduleWriteOperation(
+ "write-share-group-state",
+
topicPartitionFor(SharePartitionKey.getInstance(groupId, topicData.topicId(),
partitionData.partition())),
+
Duration.ofMillis(config.shareCoordinatorWriteTimeoutMs()),
+ coordinator -> coordinator.writeState(context, new
WriteShareGroupStateRequestData()
+ .setGroupId(groupId)
+ .setTopics(Collections.singletonList(new
WriteShareGroupStateRequestData.WriteStateData()
+ .setTopicId(topicData.topicId())
+
.setPartitions(Collections.singletonList(new
WriteShareGroupStateRequestData.PartitionData()
+
.setPartition(partitionData.partition())
+
.setStartOffset(partitionData.startOffset())
+
.setLeaderEpoch(partitionData.leaderEpoch())
+
.setStateEpoch(partitionData.stateEpoch())
+
.setStateBatches(partitionData.stateBatches())))))))
+ .exceptionally(exception -> handleOperationException(
+ "write-share-group-state",
+ request,
+ exception,
+ (error, message) ->
WriteShareGroupStateResponse.toErrorResponseData(
+ topicData.topicId(),
+ partitionData.partition(),
+ error,
+ "Unable to write share group state: " +
exception.getMessage()
+ ),
+ log
+ ));
+ partitionFut.put(partitionData.partition(), future);
+ });
+ });
+
+ // Combine all futures into a single CompletableFuture<Void>
+ CompletableFuture<Void> combinedFuture =
CompletableFuture.allOf(futureMap.values().stream()
+ .flatMap(partMap ->
partMap.values().stream()).toArray(CompletableFuture[]::new));
+
+ // topicId -> {partitionId -> responseFuture}
+ return combinedFuture.thenApply(v -> {
+ long endTime = time.hiResClockMs(); // all futures complete
+ List<WriteShareGroupStateResponseData.WriteStateResult>
writeStateResults = new ArrayList<>(futureMap.size());
+ futureMap.forEach(
+ (topicId, topicEntry) -> {
+ List<WriteShareGroupStateResponseData.PartitionResult>
partitionResults = new ArrayList<>(topicEntry.size());
+ topicEntry.forEach(
+ // map of partition id -> responses from api
+ (partitionId, responseFut) -> {
+ long timeTaken = endTime - startTime;
+ // This is the future returned by
runtime.scheduleWriteOperation which returns when the
+ // operation has completed including error
information. When this line executes, the future
+ // should be complete as we used
CompletableFuture::allOf to get a combined future from
+ // all futures in the map.
+ WriteShareGroupStateResponseData partitionData =
responseFut.getNow(null);
+
shareCoordinatorMetrics.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_LATENCY_SENSOR_NAME,
timeTaken);
Review Comment:
`timeTaken` is the same for all topic partitions in the request. It doesn't
seem right to record the same value per partition?
##########
share-coordinator/src/main/java/org/apache/kafka/coordinator/share/ShareCoordinatorShard.java:
##########
@@ -17,58 +17,552 @@
package org.apache.kafka.coordinator.share;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.message.ReadShareGroupStateRequestData;
import org.apache.kafka.common.message.ReadShareGroupStateResponseData;
import org.apache.kafka.common.message.WriteShareGroupStateRequestData;
import org.apache.kafka.common.message.WriteShareGroupStateResponseData;
+import org.apache.kafka.common.protocol.ApiMessage;
+import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.requests.ReadShareGroupStateResponse;
import org.apache.kafka.common.requests.RequestContext;
import org.apache.kafka.common.requests.TransactionResult;
+import org.apache.kafka.common.requests.WriteShareGroupStateResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetrics;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorMetricsShard;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.CoordinatorResult;
import org.apache.kafka.coordinator.common.runtime.CoordinatorShard;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorShardBuilder;
+import org.apache.kafka.coordinator.common.runtime.CoordinatorTimer;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
+import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateKey;
+import org.apache.kafka.coordinator.share.generated.ShareUpdateValue;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetrics;
+import org.apache.kafka.coordinator.share.metrics.ShareCoordinatorMetricsShard;
import org.apache.kafka.image.MetadataDelta;
import org.apache.kafka.image.MetadataImage;
+import org.apache.kafka.server.common.ApiMessageAndVersion;
+import org.apache.kafka.server.config.ShareCoordinatorConfig;
+import org.apache.kafka.server.group.share.PartitionFactory;
+import org.apache.kafka.server.group.share.SharePartitionKey;
+import org.apache.kafka.timeline.SnapshotRegistry;
+import org.apache.kafka.timeline.TimelineHashMap;
+
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
-/**
- * The share coordinator shard is a replicated state machine that manages the
metadata of all
- * share partitions. It holds the hard and the soft state of the share
partitions. This class
- * has two kinds of methods:
- * 1) The request handlers which handle the requests and generate a response
and records to
- * mutate the hard state. Those records will be written by the runtime and
applied to the
- * hard state via the replay methods.
- * 2) The replay methods which apply records to the hard state. Those are used
in the request
- * handling as well as during the initial loading of the records from the
partitions.
- */
public class ShareCoordinatorShard implements
CoordinatorShard<CoordinatorRecord> {
+ private final Logger log;
+ private final Time time;
+ private final CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private final ShareCoordinatorConfig config;
+ private final CoordinatorMetrics coordinatorMetrics;
+ private final CoordinatorMetricsShard metricsShard;
+ private final TimelineHashMap<SharePartitionKey, ShareGroupOffset>
shareStateMap; // coord key -> ShareGroupOffset
+ private final TimelineHashMap<SharePartitionKey, Integer> leaderEpochMap;
+ private final TimelineHashMap<SharePartitionKey, Integer>
snapshotUpdateCount;
+ private final TimelineHashMap<SharePartitionKey, Integer> stateEpochMap;
+ private MetadataImage metadataImage;
+ private final int snapshotUpdateRecordsPerSnapshot;
+
+ public static final Exception NULL_TOPIC_ID = new Exception("The topic id
cannot be null.");
+ public static final Exception NEGATIVE_PARTITION_ID = new Exception("The
partition id cannot be a negative number.");
+
+ public static class Builder implements
CoordinatorShardBuilder<ShareCoordinatorShard, CoordinatorRecord> {
+ private ShareCoordinatorConfig config;
+ private LogContext logContext;
+ private SnapshotRegistry snapshotRegistry;
+ private Time time;
+ private CoordinatorTimer<Void, CoordinatorRecord> timer;
+ private CoordinatorMetrics coordinatorMetrics;
+ private TopicPartition topicPartition;
+
+ public Builder(ShareCoordinatorConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withSnapshotRegistry(SnapshotRegistry snapshotRegistry) {
+ this.snapshotRegistry = snapshotRegistry;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withLogContext(LogContext logContext) {
+ this.logContext = logContext;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTime(Time time) {
+ this.time = time;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTimer(CoordinatorTimer<Void, CoordinatorRecord> timer) {
+ this.timer = timer;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withCoordinatorMetrics(CoordinatorMetrics
coordinatorMetrics) {
+ this.coordinatorMetrics = coordinatorMetrics;
+ return this;
+ }
+
+ @Override
+ public CoordinatorShardBuilder<ShareCoordinatorShard,
CoordinatorRecord> withTopicPartition(TopicPartition topicPartition) {
+ this.topicPartition = topicPartition;
+ return this;
+ }
+
+ @Override
+ @SuppressWarnings("NPathComplexity")
+ public ShareCoordinatorShard build() {
+ if (logContext == null) logContext = new LogContext();
+ if (config == null)
+ throw new IllegalArgumentException("Config must be set.");
+ if (snapshotRegistry == null)
+ throw new IllegalArgumentException("SnapshotRegistry must be
set.");
+ if (time == null)
+ throw new IllegalArgumentException("Time must be set.");
+ if (timer == null)
+ throw new IllegalArgumentException("Timer must be set.");
+ if (coordinatorMetrics == null || !(coordinatorMetrics instanceof
ShareCoordinatorMetrics))
+ throw new IllegalArgumentException("CoordinatorMetrics must be
set and be of type ShareCoordinatorMetrics.");
+ if (topicPartition == null)
+ throw new IllegalArgumentException("TopicPartition must be
set.");
+
+ ShareCoordinatorMetricsShard metricsShard =
((ShareCoordinatorMetrics) coordinatorMetrics)
+ .newMetricsShard(snapshotRegistry, topicPartition);
+
+ return new ShareCoordinatorShard(
+ logContext,
+ time,
+ timer,
+ config,
+ coordinatorMetrics,
+ metricsShard,
+ snapshotRegistry
+ );
+ }
+ }
+
+ ShareCoordinatorShard(
+ LogContext logContext,
+ Time time,
+ CoordinatorTimer<Void, CoordinatorRecord> timer,
+ ShareCoordinatorConfig config,
+ CoordinatorMetrics coordinatorMetrics,
+ CoordinatorMetricsShard metricsShard,
+ SnapshotRegistry snapshotRegistry
+ ) {
+ this.log = logContext.logger(ShareCoordinatorShard.class);
+ this.time = time;
+ this.timer = timer;
+ this.config = config;
+ this.coordinatorMetrics = coordinatorMetrics;
+ this.metricsShard = metricsShard;
+ this.shareStateMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.leaderEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateCount = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.stateEpochMap = new TimelineHashMap<>(snapshotRegistry, 0);
+ this.snapshotUpdateRecordsPerSnapshot =
config.shareCoordinatorSnapshotUpdateRecordsPerSnapshot();
+ }
+
@Override
public void onLoaded(MetadataImage newImage) {
- CoordinatorShard.super.onLoaded(newImage);
+ coordinatorMetrics.activateMetricsShard(metricsShard);
}
@Override
public void onNewMetadataImage(MetadataImage newImage, MetadataDelta
delta) {
- CoordinatorShard.super.onNewMetadataImage(newImage, delta);
+ this.metadataImage = newImage;
}
@Override
public void onUnloaded() {
- CoordinatorShard.super.onUnloaded();
+ coordinatorMetrics.deactivateMetricsShard(metricsShard);
}
@Override
public void replay(long offset, long producerId, short producerEpoch,
CoordinatorRecord record) throws RuntimeException {
+ ApiMessageAndVersion key = record.key();
+ ApiMessageAndVersion value = record.value();
+
+ switch (key.version()) {
+ case ShareCoordinator.SHARE_SNAPSHOT_RECORD_KEY_VERSION: //
ShareSnapshot
+ handleShareSnapshot((ShareSnapshotKey) key.message(),
(ShareSnapshotValue) messageOrNull(value));
+ break;
+ case ShareCoordinator.SHARE_UPDATE_RECORD_KEY_VERSION: //
ShareUpdate
+ handleShareUpdate((ShareUpdateKey) key.message(),
(ShareUpdateValue) messageOrNull(value));
+ break;
+ default:
+ // noop
+ }
+ }
+
+ private void handleShareSnapshot(ShareSnapshotKey key, ShareSnapshotValue
value) {
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+ maybeUpdateStateEpochMap(mapKey, value.stateEpoch());
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this record is the complete snapshot
+ shareStateMap.put(mapKey, offsetRecord);
+ // if number of share updates is exceeded, then reset it
+ if (snapshotUpdateCount.containsKey(mapKey)) {
+ if (snapshotUpdateCount.get(mapKey) >=
snapshotUpdateRecordsPerSnapshot) {
+ snapshotUpdateCount.put(mapKey, 0);
+ }
+ }
+ }
+
+ private void handleShareUpdate(ShareUpdateKey key, ShareUpdateValue value)
{
+ SharePartitionKey mapKey =
SharePartitionKey.getInstance(key.groupId(), key.topicId(), key.partition());
+ maybeUpdateLeaderEpochMap(mapKey, value.leaderEpoch());
+
+ // share update does not hold state epoch information.
+
+ ShareGroupOffset offsetRecord = ShareGroupOffset.fromRecord(value);
+ // this is an incremental snapshot
+ // so, we need to apply it to our current soft state
+ shareStateMap.compute(mapKey, (k, v) -> v == null ? offsetRecord :
merge(v, value));
+ snapshotUpdateCount.compute(mapKey, (k, v) -> v == null ? 0 : v + 1);
+ }
+
+ private void maybeUpdateLeaderEpochMap(SharePartitionKey mapKey, int
leaderEpoch) {
+ leaderEpochMap.putIfAbsent(mapKey, leaderEpoch);
+ if (leaderEpochMap.get(mapKey) < leaderEpoch) {
+ leaderEpochMap.put(mapKey, leaderEpoch);
+ }
+ }
+
+ private void maybeUpdateStateEpochMap(SharePartitionKey mapKey, int
stateEpoch) {
+ stateEpochMap.putIfAbsent(mapKey, stateEpoch);
+ if (stateEpochMap.get(mapKey) < stateEpoch) {
+ stateEpochMap.put(mapKey, stateEpoch);
+ }
}
@Override
public void replayEndTransactionMarker(long producerId, short
producerEpoch, TransactionResult result) throws RuntimeException {
CoordinatorShard.super.replayEndTransactionMarker(producerId,
producerEpoch, result);
}
- public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(RequestContext context,
WriteShareGroupStateRequestData request) {
- throw new RuntimeException("Not implemented");
+ /**
+ * This method generates the ShareSnapshotValue record corresponding to
the requested topic partition information.
+ * The generated record is then written to the __share_group_state topic
and replayed to the in-memory state
+ * of the coordinator shard, shareStateMap, by CoordinatorRuntime.
+ * <p>
+ * This method as called by the ShareCoordinatorService will be provided
with
+ * the request data which covers only a single key i.e.
group1:topic1:partition1. The implementation
+ * below was done keeping this in mind.
+ *
+ * @param context - RequestContext
+ * @param request - WriteShareGroupStateRequestData for a single key
+ * @return CoordinatorResult(records, response)
+ */
+ @SuppressWarnings("NPathComplexity")
+ public CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord> writeState(
+ RequestContext context,
+ WriteShareGroupStateRequestData request
+ ) {
+ // records to write (with both key and value of snapshot type),
response to caller
+ // only one key will be there in the request by design
+
+
metricsShard.record(ShareCoordinatorMetrics.SHARE_COORDINATOR_WRITE_SENSOR_NAME);
+ Optional<CoordinatorResult<WriteShareGroupStateResponseData,
CoordinatorRecord>> error = maybeGetWriteStateError(request);
+ if (error.isPresent()) {
+ return error.get();
+ }
+
+ String groupId = request.groupId();
+ WriteShareGroupStateRequestData.WriteStateData topicData =
request.topics().get(0);
+ WriteShareGroupStateRequestData.PartitionData partitionData =
topicData.partitions().get(0);
+
+ SharePartitionKey key = SharePartitionKey.getInstance(groupId,
topicData.topicId(), partitionData.partition());
+ List<CoordinatorRecord> recordList;
+
+ if (!shareStateMap.containsKey(key)) {
+ // since this is the first time we are getting a write request, we
should be creating a share snapshot record
+ recordList =
Collections.singletonList(ShareCoordinatorRecordHelpers.newShareSnapshotRecord(
+ groupId, topicData.topicId(), partitionData.partition(),
ShareGroupOffset.fromRequest(partitionData)
+ ));
+ } else if (snapshotUpdateCount.getOrDefault(key, 0) >=
snapshotUpdateRecordsPerSnapshot) {
+ // Since the number of update records for this share part key
exceeds snapshotUpdateRecordsPerSnapshot,
+ // we should be creating a share snapshot record.
+ List<PersisterOffsetsStateBatch> batchesToAdd;
+ if (partitionData.startOffset() == -1) {
+ batchesToAdd = combineStateBatches(
+ shareStateMap.get(key).stateBatchAsSet(),
+ partitionData.stateBatches().stream()
+ .map(PersisterOffsetsStateBatch::from)
+ .collect(Collectors.toCollection(LinkedHashSet::new)));
+ } else {
+ // start offset is being updated - we should only
+ // consider new updates to batches
+ batchesToAdd = partitionData.stateBatches().stream()
Review Comment:
Hmm, why are we only consider the new updates when start offset changes?
Consider the following example.
State in ShareCoordinator:
startOffset: 100
Batch1 {
firstOffset: 100
lastOffset: 109
deliverState: Acquired
deliverCount: 1
}
Batch2 {
firstOffset: 110
lastOffset: 119
deliverState: Acquired
deliverCount: 2
}
Batch3 {
firstOffset: 120
lastOffset: 129
deliverState: Acquired
deliverCount: 0
}
1. Share leader acks batch 1 and sends the state of batch 1 to Share
Coordinator.
2. Share leader advances startOffset to 110.
3. Share leader acks batch 3 and sends the new startOffset and the state of
batch 3 to share coordinator.
4. Share coordinator writes the snapshot with startOffset 110 and batch 3.
Now the deliver count for batch 2 is lost.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]