JAkutenshi commented on code in PR #3548:
URL: https://github.com/apache/ignite-3/pull/3548#discussion_r1558914229


##########
modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java:
##########
@@ -796,7 +797,7 @@ public class IgniteImpl implements Ignite {
                 catalogManager,
                 metaStorageMgr,
                 indexManager,
-                placementDriverMgr.placementDriver(),
+                new 
ReplicaAwareLeaseTracker(placementDriverMgr.placementDriver(), replicaSvc, 
clusterSvc.topologyService()),

Review Comment:
   Why we should use the delegate only for index?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -141,12 +142,17 @@ private CompletableFuture<?> 
onIndexBuilding(StartBuildingIndexEventParameters p
 
             var startBuildIndexFutures = new ArrayList<CompletableFuture<?>>();
 
-            for (TablePartitionId primaryReplicaId : primaryReplicaIds) {
-                if (primaryReplicaId.tableId() == indexDescriptor.tableId()) {
-                    CompletableFuture<?> startBuildIndexFuture = 
getMvTableStorageFuture(parameters.causalityToken(), primaryReplicaId)
-                            .thenCompose(mvTableStorage -> 
awaitPrimaryReplica(primaryReplicaId, clockService.now())
+            for (ZonePartitionId zonePartitionId : primaryReplicaIds) {
+
+                int tableId = zonePartitionId.tableId();
+
+                TablePartitionId tablePartId = new TablePartitionId(tableId, 
zonePartitionId.partitionId());
+
+                if (tableId == indexDescriptor.tableId()) {
+                    CompletableFuture<?> startBuildIndexFuture = 
getMvTableStorageFuture(parameters.causalityToken(), tablePartId)

Review Comment:
   Shouldn't `getMvTableStorageFuture`'s second argument be changed to 
ZonePartitionId too?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -244,7 +258,7 @@ private void tryScheduleBuildIndex(
      * @param replicaId Replica ID.
      */
     private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId 
replicaId) {

Review Comment:
   As an additional question: isn't one of subtask is to get rid of 
`TablePartitionId`?



##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexBuildController.java:
##########
@@ -244,7 +258,7 @@ private void tryScheduleBuildIndex(
      * @param replicaId Replica ID.
      */
     private void stopBuildingIndexesIfPrimaryExpired(TablePartitionId 
replicaId) {

Review Comment:
   The same, shouldn't be there new `ZonePartitionId`?



##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java:
##########
@@ -98,10 +108,11 @@ private CatalogHashIndexDescriptor(
             boolean unique,
             CatalogIndexStatus status,
             int txWaitCatalogVersion,
+            int zoneId,

Review Comment:
   Missed zoneId in jdoc like above?



##########
modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/ReplicaAwareLeaseTracker.java:
##########
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver.leases;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.event.AbstractEventProducer;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.ReplicaService;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import org.apache.ignite.internal.replicator.message.WaitReplicaStateMessage;
+import org.apache.ignite.network.ClusterNode;
+import org.apache.ignite.network.ClusterNodeResolver;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link PlacementDriver} that is aware if {@link 
ReplicaService}.
+ * It delegates calls to the original {@link PlacementDriver} and after that 
sends {@link WaitReplicaStateMessage}
+ * which calls {@link 
org.apache.ignite.internal.replicator.Replica#waitForActualState(long)}.
+ */
+public class ReplicaAwareLeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, PrimaryReplicaEventParameters> 
implements
+        PlacementDriver {
+    /** Replicator network message factory. */
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
+    private final PlacementDriver delegate;
+    private final ReplicaService replicaService;
+
+    /** Resolver that resolves a node consistent ID to cluster node. */
+    private final ClusterNodeResolver clusterNodeResolver;
+
+
+    /**
+     * Constructor.
+     *
+     * @param delegate Delegate Placement Driver.
+     * @param replicaService Replica Service.
+     * @param clusterNodeResolver Cluster node resolver.
+     */
+    public ReplicaAwareLeaseTracker(PlacementDriver delegate, ReplicaService 
replicaService, ClusterNodeResolver clusterNodeResolver) {
+        this.delegate = delegate;
+        this.replicaService = replicaService;
+        this.clusterNodeResolver = clusterNodeResolver;
+    }
+
+    @Override
+    public void listen(PrimaryReplicaEvent evt, EventListener<? extends 
PrimaryReplicaEventParameters> listener) {
+        delegate.listen(evt, listener);
+    }
+
+    @Override
+    public void removeListener(PrimaryReplicaEvent evt, EventListener<? 
extends PrimaryReplicaEventParameters> listener) {
+        delegate.removeListener(evt, listener);
+    }
+
+    @Override
+    public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp, long 
timeout,
+            TimeUnit unit) {
+        return delegate.awaitPrimaryReplica(groupId, timestamp, timeout, unit);
+    }
+
+    @Override
+    public CompletableFuture<ReplicaMeta> awaitPrimaryReplicaForTable(
+            ZonePartitionId groupId,
+            HybridTimestamp timestamp,
+            long timeout,
+            TimeUnit unit
+    ) {
+        TablePartitionId tablePartitionId = new 
TablePartitionId(groupId.tableId(), groupId.partitionId());
+
+        return delegate.awaitPrimaryReplica(tablePartitionId, timestamp, 
timeout, unit)
+                .thenCompose(replicaMeta -> {
+                    ClusterNode leaseholderNode = 
clusterNodeResolver.getById(replicaMeta.getLeaseholderId());
+
+                    WaitReplicaStateMessage awaitReplicaReq = 
REPLICA_MESSAGES_FACTORY.waitReplicaStateMessage()
+                            .groupId(tablePartitionId)

Review Comment:
   The same, as input we got ZPId as groupId, but have to use TPId



##########
modules/core/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionId.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.replicator;
+
+import java.util.Objects;
+
+/**
+ * The class is used to identify a zone replication group id for a given 
partition.
+ */
+public class ZonePartitionId implements ReplicationGroupId {
+
+    /** Zone id. */
+    private final int zoneId;
+
+    /** Partition id. */
+    private final int partId;
+
+    private final int tableId;
+
+    /**
+     * The constructor.
+     *
+     * @param zoneId Zone id.
+     * @param partId Partition id.
+     */
+    public ZonePartitionId(int zoneId, int partId, int tableId) {

Review Comment:
   May I propose to change args order to `(zoneId, tableId, partId)` for call 
consistency? Like in `ChangeIndexStatusTask:L238` will look like from more 
global to less global ids'.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to