denis-chudov commented on code in PR #1726:
URL: https://github.com/apache/ignite-3/pull/1726#discussion_r1122763055
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -394,6 +383,21 @@ public class IgniteImpl implements Ignite {
ConfigurationRegistry clusterConfigRegistry =
clusterCfgMgr.configurationRegistry();
+ TablesConfiguration tablesConfiguration =
clusterConfigRegistry.getConfiguration(TablesConfiguration.KEY);
+
+ // placementDriverMgr = new PlacementDriverManager(
+ // metaStorageMgr,
+ // vaultMgr,
+ // MetastorageGroupId.INSTANCE,
+ // clusterSvc,
+ // raftConfiguration,
+ // cmgMgr::metaStorageNodes,
+ // logicalTopologyService,
+ // raftExecutorService,
+ // tablesConfiguration,
+ // clock
+ // );
+
Review Comment:
please add TODO with ticket number
##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -199,8 +197,8 @@ public class IgniteImpl implements Ignite {
/** Meta storage manager. */
private final MetaStorageManager metaStorageMgr;
- /** Placement driver manager. */
- private final PlacementDriverManager placementDriverMgr;
+ // /** Placement driver manager. */
+ //private final PlacementDriverManager placementDriverMgr;
Review Comment:
please add TODO with ticket number
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -187,10 +335,264 @@ private void takeOverActiveActor() {
*/
private void stepDownActiveActor() {
isActiveActor = false;
+
+ stopUpdater();
}
@TestOnly
boolean isActiveActor() {
return isActiveActor;
}
+
+ /**
+ * Starts a dedicated thread to renew or assign leases.
+ */
+ private void startUpdater() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ updater = new
Thread(NamedThreadFactory.threadPrefix(clusterService.topologyService().localMember().name(),
"lease-updater")) {
+ @Override
+ public void run() {
+ while (isActiveActor) {
+ for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry
: groupAssignments.entrySet()) {
+ ReplicationGroupId grpId = entry.getKey();
+
+ ReplicationGroupLease lease =
replicationGroups.getOrDefault(grpId, new ReplicationGroupLease());
+
+ HybridTimestamp now = clock.now();
+
+ if (lease.stopLeas == null || now.getPhysical() >
(lease.stopLeas.getPhysical() - LEASE_PERIOD / 2)) {
+ var leaseKey =
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+ var newTs = new HybridTimestamp(now.getPhysical()
+ LEASE_PERIOD, 0);
+
+ ClusterNode holder =
nextLeaseHolder(entry.getValue());
+
+ if (lease.leaseholder == null && holder != null
+ || lease.leaseholder != null &&
lease.leaseholder.equals(holder)
+ || holder != null && (lease.stopLeas ==
null || now.getPhysical() > lease.stopLeas.getPhysical())) {
Review Comment:
does this mean the meta storage invoke happens only if now is greater than
lease expiration time, how it combines with condition above
`lease.stopLeas.getPhysical() - LEASE_PERIOD / 2` ?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -187,10 +335,264 @@ private void takeOverActiveActor() {
*/
private void stepDownActiveActor() {
isActiveActor = false;
+
+ stopUpdater();
}
@TestOnly
boolean isActiveActor() {
return isActiveActor;
}
+
+ /**
+ * Starts a dedicated thread to renew or assign leases.
+ */
+ private void startUpdater() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ updater = new
Thread(NamedThreadFactory.threadPrefix(clusterService.topologyService().localMember().name(),
"lease-updater")) {
+ @Override
+ public void run() {
+ while (isActiveActor) {
+ for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry
: groupAssignments.entrySet()) {
+ ReplicationGroupId grpId = entry.getKey();
+
+ ReplicationGroupLease lease =
replicationGroups.getOrDefault(grpId, new ReplicationGroupLease());
+
+ HybridTimestamp now = clock.now();
+
+ if (lease.stopLeas == null || now.getPhysical() >
(lease.stopLeas.getPhysical() - LEASE_PERIOD / 2)) {
+ var leaseKey =
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+ var newTs = new HybridTimestamp(now.getPhysical()
+ LEASE_PERIOD, 0);
+
+ ClusterNode holder =
nextLeaseHolder(entry.getValue());
Review Comment:
why the new leaseholder is selected each time the lease is going to expire?
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -187,10 +335,264 @@ private void takeOverActiveActor() {
*/
private void stepDownActiveActor() {
isActiveActor = false;
+
+ stopUpdater();
}
@TestOnly
boolean isActiveActor() {
return isActiveActor;
}
+
+ /**
+ * Starts a dedicated thread to renew or assign leases.
+ */
+ private void startUpdater() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ updater = new
Thread(NamedThreadFactory.threadPrefix(clusterService.topologyService().localMember().name(),
"lease-updater")) {
+ @Override
+ public void run() {
+ while (isActiveActor) {
+ for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry
: groupAssignments.entrySet()) {
+ ReplicationGroupId grpId = entry.getKey();
+
+ ReplicationGroupLease lease =
replicationGroups.getOrDefault(grpId, new ReplicationGroupLease());
+
+ HybridTimestamp now = clock.now();
+
+ if (lease.stopLeas == null || now.getPhysical() >
(lease.stopLeas.getPhysical() - LEASE_PERIOD / 2)) {
+ var leaseKey =
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+ var newTs = new HybridTimestamp(now.getPhysical()
+ LEASE_PERIOD, 0);
+
+ ClusterNode holder =
nextLeaseHolder(entry.getValue());
+
+ if (lease.leaseholder == null && holder != null
+ || lease.leaseholder != null &&
lease.leaseholder.equals(holder)
+ || holder != null && (lease.stopLeas ==
null || now.getPhysical() > lease.stopLeas.getPhysical())) {
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ ReplicationGroupLease renewedLease = new
ReplicationGroupLease();
+ renewedLease.stopLeas = newTs;
+ renewedLease.leaseholder = holder;
+
+ metaStorageManager.invoke(
+ or(notExists(leaseKey),
value(leaseKey).eq(leaseRaw)),
+ put(leaseKey,
ByteUtils.toBytes(renewedLease)),
+ noop()
+ );
+ }
+ }
+ }
+
+ try {
+ Thread.sleep(UPDATE_LEASE_MS);
+ } catch (InterruptedException e) {
+ LOG.error("Lease updater is interrupted");
+ }
+ }
+ }
+ };
+
+ updater.start();
+ }
+
+ /**
+ * Finds a node that can be the leaseholder.
+ *
+ * @param assignments Replication group assignment.
+ * @return Cluster node, or {@code null} if no node in assignments can be
the leaseholder.
+ */
+ private ClusterNode nextLeaseHolder(Set<Assignment> assignments) {
+ //TODO: IGNITE-18879 Implement more intellectual algorithm to choose a
node.
+ for (Assignment assignment : assignments) {
+ ClusterNode candidate =
clusterService.topologyService().getByConsistentId(assignment.consistentId());
+
+ if (candidate != null && hasNodeInLogicalTopology(candidate)) {
+ return candidate;
+ }
+ }
+
+ return null;
+ }
+
+ /**
+ * Checks a node to contains in the logical topology.
+ *
+ * @param node Node to check.
+ * @return True if node is in the logical topology.
+ */
+ private boolean hasNodeInLogicalTopology(ClusterNode node) {
+ LogicalTopologySnapshot logicalTopologySnap0 =
logicalTopologySnap.get();
+
+ return logicalTopologySnap0 != null &&
logicalTopologySnap0.nodes().stream().anyMatch(n ->
n.name().equals(node.name()));
+ }
+
+ /**
+ * Stops a dedicated thread to renew or assign leases.
+ */
+ private void stopUpdater() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ if (updater != null) {
+ updater.interrupt();
+
+ updater = null;
+ }
+ }
+
+ /**
+ * Triggers to renew leases forcibly.
+ */
+ private void triggerToRenewLeases() {
+ if (!isActiveActor) {
+ return;
+ }
+
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ }
+
+ /**
+ * Listen lease holder updates.
+ */
+ private class UpdateLeaseHolderWatchListener implements WatchListener {
+ @Override
+ public void onUpdate(WatchEvent event) {
+ for (EntryEvent entry : event.entryEvents()) {
+ Entry msEntry = entry.newEntry();
+
+ String key = new ByteArray(msEntry.key()).toString();
+
+ key = key.replace(PLACEMENTDRIVER_PREFIX, "");
+
+ TablePartitionId grpId = TablePartitionId.fromString(key);
+
+ if (msEntry.empty()) {
+ replicationGroups.remove(grpId);
+ } else {
+ ReplicationGroupLease lease =
ByteUtils.fromBytes(msEntry.value());
+
+ replicationGroups.put(grpId, lease);
+ }
+
+ if (msEntry.empty() || entry.oldEntry().empty()) {
+ triggerToRenewLeases();
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ }
+
+
+ /**
+ * Configuration assignments listener.
+ */
+ private class AssignmentCfgListener implements
ConfigurationListener<byte[]> {
+ @Override
+ public CompletableFuture<?>
onUpdate(ConfigurationNotificationEvent<byte[]> assignmentsCtx) {
+ ExtendedTableConfiguration tblCfg =
assignmentsCtx.config(ExtendedTableConfiguration.class);
+
+ UUID tblId = tblCfg.id().value();
+
+ LOG.debug("Table assignments configuration update for placement
driver [revision={}, tblId={}]",
+ assignmentsCtx.storageRevision(), tblId);
+
+ List<Set<Assignment>> tableAssignments =
+ assignmentsCtx.newValue() == null ? null :
ByteUtils.fromBytes(assignmentsCtx.newValue());
+
+ for (int part = 0; part < tblCfg.partitions().value(); part++) {
+ var replicationGrpId = new TablePartitionId(tblId, part);
+
+ if (tableAssignments == null) {
+ groupAssignments.remove(replicationGrpId);
+ } else {
+ groupAssignments.put(replicationGrpId,
tableAssignments.get(part));
+ }
+ }
+
+ return completedFuture(null);
+ }
+ }
+
+ /**
+ * Metastorage topology watch.
+ */
+ private class PlacementDriverLogicalTopologyEventListener implements
LogicalTopologyEventListener {
+ @Override
+ public void onNodeJoined(ClusterNode joinedNode,
LogicalTopologySnapshot newTopology) {
+ onUpdate(newTopology);
+ }
+
+ @Override
+ public void onNodeLeft(ClusterNode leftNode, LogicalTopologySnapshot
newTopology) {
+ onUpdate(newTopology);
+ }
+
+ /**
+ * Updates local topology cache.
+ *
+ * @param topologySnap Topology snasphot.
+ */
+ public void onUpdate(LogicalTopologySnapshot topologySnap) {
+ LogicalTopologySnapshot logicalTopologySnap0;
+
+ do {
+ logicalTopologySnap0 = logicalTopologySnap.get();
+
+ if (logicalTopologySnap0 != null &&
logicalTopologySnap0.version() >= topologySnap.version()) {
+ break;
+ }
+ } while (!logicalTopologySnap.compareAndSet(logicalTopologySnap0,
topologySnap));
+
+ LOG.debug("Logical topology updated in placement driver
[topologySnap={}]", topologySnap);
+
+ triggerToRenewLeases();
+ }
+ }
+
+ /**
+ * Metastorage assignments watch.
+ */
+ private class AssignmentWatchListener implements WatchListener {
+ @Override
+ public void onUpdate(WatchEvent event) {
+ assert !event.entryEvent().newEntry().empty() : "New assignments
are empty";
+
+ LOG.debug("Assignment update [revision={}, key={}]",
event.revision(),
+ new ByteArray(event.entryEvent().newEntry().key()));
+
+ for (EntryEvent evt : event.entryEvents()) {
+ var replicationGrpId = TablePartitionId.fromString(
+ new String(evt.newEntry().key(),
StandardCharsets.UTF_8).replace(STABLE_ASSIGNMENTS_PREFIX, ""));
+
+ if (evt.newEntry().empty()) {
+ groupAssignments.remove(replicationGrpId);
+ } else {
+ groupAssignments.put(replicationGrpId,
ByteUtils.fromBytes(evt.newEntry().value()));
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable e) {
+ }
+ }
+
+ /**
+ * Replica group holder.
+ */
+ public static class ReplicationGroupLease implements Serializable {
+ ClusterNode leaseholder;
+ HybridTimestamp stopLeas;
Review Comment:
`leaseExpirationTime` would be much better
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -187,10 +335,264 @@ private void takeOverActiveActor() {
*/
private void stepDownActiveActor() {
isActiveActor = false;
+
+ stopUpdater();
}
@TestOnly
boolean isActiveActor() {
return isActiveActor;
}
+
+ /**
+ * Starts a dedicated thread to renew or assign leases.
+ */
+ private void startUpdater() {
+ //TODO: IGNITE-18879 Implement lease maintenance.
+ updater = new
Thread(NamedThreadFactory.threadPrefix(clusterService.topologyService().localMember().name(),
"lease-updater")) {
+ @Override
+ public void run() {
+ while (isActiveActor) {
+ for (Map.Entry<ReplicationGroupId, Set<Assignment>> entry
: groupAssignments.entrySet()) {
+ ReplicationGroupId grpId = entry.getKey();
+
+ ReplicationGroupLease lease =
replicationGroups.getOrDefault(grpId, new ReplicationGroupLease());
+
+ HybridTimestamp now = clock.now();
+
+ if (lease.stopLeas == null || now.getPhysical() >
(lease.stopLeas.getPhysical() - LEASE_PERIOD / 2)) {
+ var leaseKey =
ByteArray.fromString(PLACEMENTDRIVER_PREFIX + grpId);
+
+ var newTs = new HybridTimestamp(now.getPhysical()
+ LEASE_PERIOD, 0);
+
+ ClusterNode holder =
nextLeaseHolder(entry.getValue());
+
+ if (lease.leaseholder == null && holder != null
+ || lease.leaseholder != null &&
lease.leaseholder.equals(holder)
+ || holder != null && (lease.stopLeas ==
null || now.getPhysical() > lease.stopLeas.getPhysical())) {
+ byte[] leaseRaw = ByteUtils.toBytes(lease);
+
+ ReplicationGroupLease renewedLease = new
ReplicationGroupLease();
+ renewedLease.stopLeas = newTs;
+ renewedLease.leaseholder = holder;
Review Comment:
Proper constructor for `ReplicationGroupLease` is needed.
##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java:
##########
@@ -43,71 +79,114 @@
* The another role of the manager is providing a node, which is leaseholder
at the moment, for a particular replication group.
*/
public class PlacementDriverManager implements IgniteComponent {
+ public static final String PLACEMENTDRIVER_PREFIX =
"placementdriver.lease.";
+ /** Ignite logger. */
+ private static final IgniteLogger LOG =
Loggers.forClass(PlacementDriverManager.class);
+ private static final long UPDATE_LEASE_MS = 200L;
+ public static final long LEASE_PERIOD = 10 * UPDATE_LEASE_MS;
/** Busy lock to stop synchronously. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
-
private final RaftMessagesFactory raftMessagesFactory = new
RaftMessagesFactory();
-
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
-
- private final ReplicationGroupId replicationGroupId;
-
- private final ClusterService clusterService;
-
- private final Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider;
-
+ /** Metastorage manager. */
+ private final MetaStorageManager metaStorageManager;
+ /**
+ * Vault manager.
+ */
+ private final VaultManager vaultManager;
/**
- * Raft client future. Can contain null, if this node is not in placement
driver group.
+ * Configuration.
+ * TODO: The property is removed after assignments will moved to
metastorage.
*/
+ private final TablesConfiguration tablesCfg;
+ private final HybridClock clock;
+ private final ClusterService clusterService;
+ private final ReplicationGroupId replicationGroupId;
+ private final Supplier<CompletableFuture<Set<String>>>
placementDriverNodesNamesProvider;
+ /** Raft client future. Can contain null, if this node is not in placement
driver group. */
Review Comment:
dont we leave empty lines between field definitions? seems that formally
code style doesnt require this, but i got used to it too much )
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]