rpuch commented on code in PR #5798:
URL: https://github.com/apache/ignite-3/pull/5798#discussion_r2086668500


##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -702,46 +702,50 @@ private CompletableFuture<Void> 
beforeZoneReplicaStartedImpl(LocalPartitionRepli
         return inBusyLockAsync(busyLock, () -> {
             ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
-            StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
+            NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(
+                    zonePartitionId.zoneId(),
+                    id -> new NaiveAsyncReadWriteLock());
 
-            long stamp = zoneLock.readLock();
+            CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
 
             try {
-                Set<TableImpl> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
-
-                int partitionIndex = zonePartitionId.partitionId();
-
-                PartitionSet singlePartitionIdSet = 
PartitionSet.of(partitionIndex);
-
-                CompletableFuture<?>[] futures = zoneTables.stream()
-                        .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                            return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
-                                    .thenRunAsync(() -> inBusyLock(busyLock, 
() -> {
-                                        localPartsByTableId.compute(
-                                                tbl.tableId(),
-                                                (tableId, oldPartitionSet) -> 
extendPartitionSet(oldPartitionSet, partitionIndex)
-                                        );
-
-                                        lowWatermark.getLowWatermarkSafe(lwm ->
-                                                registerIndexesToTable(
-                                                        tbl,
-                                                        catalogService,
-                                                        singlePartitionIdSet,
-                                                        tbl.schemaView(),
-                                                        lwm
-                                                )
-                                        );
-
-                                        
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, 
parameters.onRecovery());
-                                    }), ioExecutor)
-                                    // If the table is already closed, it's 
not a problem (probably the node is stopping).
-                                    
.exceptionally(ignoreTableClosedException());
-                        }))
-                        .toArray(CompletableFuture[]::new);
+                return readLockAcquisitionFuture.thenCompose(stamp -> {
+                    Set<TableImpl> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
+
+                    int partitionIndex = zonePartitionId.partitionId();
+
+                    PartitionSet singlePartitionIdSet = 
PartitionSet.of(partitionIndex);
+
+                    CompletableFuture<?>[] futures = zoneTables.stream()
+                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
+                                return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
+                                        .thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
+                                            localPartsByTableId.compute(
+                                                    tbl.tableId(),
+                                                    (tableId, oldPartitionSet) 
-> extendPartitionSet(oldPartitionSet, partitionIndex)
+                                            );
+
+                                            
lowWatermark.getLowWatermarkSafe(lwm ->
+                                                    registerIndexesToTable(
+                                                            tbl,
+                                                            catalogService,
+                                                            
singlePartitionIdSet,
+                                                            tbl.schemaView(),
+                                                            lwm
+                                                    )
+                                            );
+
+                                            
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, 
parameters.onRecovery());
+                                        }), ioExecutor)
+                                        // If the table is already closed, 
it's not a problem (probably the node is stopping).
+                                        
.exceptionally(ignoreTableClosedException());
+                            }))
+                            .toArray(CompletableFuture[]::new);
 
-                return allOf(futures).thenAccept(unused -> 
zoneLock.unlockRead(stamp));
+                    return allOf(futures).thenAccept(unused -> 
zoneLock.unlockRead(stamp));

Review Comment:
   If any of `futures` completes exceptionally, `thenAccept()` will not be 
called, so the lock will remain forever. Should we switch to `whenComplete()`?



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -764,22 +768,26 @@ private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEve
 
         ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
-        StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
+        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+                zonePartitionId.zoneId(),
+                id -> new NaiveAsyncReadWriteLock());
 
-        long stamp = zoneLock.readLock();
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
 
         try {
-            CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                    .map(this::stopTablePartitions)
-                    .toArray(CompletableFuture[]::new);
+            return readLockAcquisitionFuture.thenCompose(stamp -> {
+                CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                        .map(this::stopTablePartitions)
+                        .toArray(CompletableFuture[]::new);
 
-            return allOf(futures).thenApply(unused -> {
-                zoneLock.unlockRead(stamp);
+                return allOf(futures).thenApply(unused -> {

Review Comment:
   Same thing about unlocking



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -792,28 +800,32 @@ private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaE
 
         ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
-        StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
+        NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(
+                zonePartitionId.zoneId(),
+                id -> new NaiveAsyncReadWriteLock());
 
-        long stamp = zoneLock.readLock();
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
 
         try {
-            return inBusyLockAsync(busyLock, () -> {
-                CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                        .map(table -> supplyAsync(
-                                () -> inBusyLockAsync(
-                                        busyLock,
-                                        () -> stopAndDestroyTablePartition(
-                                                new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
-                                                parameters.causalityToken()
-                                        )
-                                ),
-                                ioExecutor))
-                        .toArray(CompletableFuture[]::new);
+            return readLockAcquisitionFuture.thenCompose(stamp -> {
+                return inBusyLockAsync(busyLock, () -> {
+                    CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                            .map(table -> supplyAsync(
+                                    () -> inBusyLockAsync(
+                                            busyLock,
+                                            () -> stopAndDestroyTablePartition(
+                                                    new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
+                                                    parameters.causalityToken()
+                                            )
+                                    ),
+                                    ioExecutor))
+                            .toArray(CompletableFuture[]::new);
 
-                return allOf(futures);
-            }).thenApply((unused) -> false);
+                    return allOf(futures).thenAccept(unused -> 
zoneLock.unlockRead(stamp));

Review Comment:
   And here



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -3272,14 +3288,22 @@ public void 
setStreamerReceiverRunner(StreamerReceiverRunner runner) {
      * @return Set of tables.
      */
     public Set<TableImpl> zoneTables(int zoneId) {
-        StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id 
-> new StampedLock());
+        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
 
-        long stamp = zoneLock.readLock();
+        CompletableFuture<Long> readLockAcquisitionFuture = 
zoneLock.readLock();
 
         try {
-            return Set.copyOf(zoneTablesRawSet(zoneId));
-        } finally {
-            zoneLock.unlockRead(stamp);
+            return readLockAcquisitionFuture.thenApply(stamp -> {
+                Set<TableImpl> res = Set.copyOf(zoneTablesRawSet(zoneId));
+
+                zoneLock.unlockRead(stamp);
+
+                return res;
+            }).join();

Review Comment:
   This might block the calling thread for a long time (while write lock is 
held by someone). Why do we even need a lock here? Would a thread-safe map be 
enough (without any lock)? Calls of this method do not need to be linearized 
with the internal machinery



##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java:
##########
@@ -3301,21 +3325,28 @@ private Set<TableImpl> zoneTablesRawSet(int zoneId) {
      * @param table Table to add.
      */
     private void addTableToZone(int zoneId, TableImpl table) {
-        StampedLock rwLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> 
new StampedLock());
+        NaiveAsyncReadWriteLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock());
+
+        CompletableFuture<Long> writeLockAcquisitionFuture = 
zoneLock.writeLock();
 
-        long stamp = rwLock.writeLock();
         try {
-            tablesPerZone.compute(zoneId, (id, tables) -> {
-                if (tables == null) {
-                    tables = new HashSet<>();
-                }
+            writeLockAcquisitionFuture.thenAccept(stamp -> {
+                tablesPerZone.compute(zoneId, (id, tables) -> {
+                    if (tables == null) {
+                        tables = new HashSet<>();
+                    }
 
-                tables.add(table);
+                    tables.add(table);
 
-                return tables;
-            });
-        } finally {
-            rwLock.unlockWrite(stamp);
+                    return tables;
+                });
+
+                zoneLock.unlockWrite(stamp);
+            }).join();

Review Comment:
   Same note about blocking the calling thread. Do we really need it? If yes, 
this looks very suspicious, it smells like a possible deadlock



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to