rpuch commented on code in PR #5798: URL: https://github.com/apache/ignite-3/pull/5798#discussion_r2086668500
########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -702,46 +702,50 @@ private CompletableFuture<Void> beforeZoneReplicaStartedImpl(LocalPartitionRepli return inBusyLockAsync(busyLock, () -> { ZonePartitionId zonePartitionId = parameters.zonePartitionId(); - StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new StampedLock()); + NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent( + zonePartitionId.zoneId(), + id -> new NaiveAsyncReadWriteLock()); - long stamp = zoneLock.readLock(); + CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock(); try { - Set<TableImpl> zoneTables = zoneTablesRawSet(zonePartitionId.zoneId()); - - int partitionIndex = zonePartitionId.partitionId(); - - PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex); - - CompletableFuture<?>[] futures = zoneTables.stream() - .map(tbl -> inBusyLockAsync(busyLock, () -> { - return getOrCreatePartitionStorages(tbl, singlePartitionIdSet) - .thenRunAsync(() -> inBusyLock(busyLock, () -> { - localPartsByTableId.compute( - tbl.tableId(), - (tableId, oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex) - ); - - lowWatermark.getLowWatermarkSafe(lwm -> - registerIndexesToTable( - tbl, - catalogService, - singlePartitionIdSet, - tbl.schemaView(), - lwm - ) - ); - - preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, parameters.onRecovery()); - }), ioExecutor) - // If the table is already closed, it's not a problem (probably the node is stopping). - .exceptionally(ignoreTableClosedException()); - })) - .toArray(CompletableFuture[]::new); + return readLockAcquisitionFuture.thenCompose(stamp -> { + Set<TableImpl> zoneTables = zoneTablesRawSet(zonePartitionId.zoneId()); + + int partitionIndex = zonePartitionId.partitionId(); + + PartitionSet singlePartitionIdSet = PartitionSet.of(partitionIndex); + + CompletableFuture<?>[] futures = zoneTables.stream() + .map(tbl -> inBusyLockAsync(busyLock, () -> { + return getOrCreatePartitionStorages(tbl, singlePartitionIdSet) + .thenRunAsync(() -> inBusyLock(busyLock, () -> { + localPartsByTableId.compute( + tbl.tableId(), + (tableId, oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex) + ); + + lowWatermark.getLowWatermarkSafe(lwm -> + registerIndexesToTable( + tbl, + catalogService, + singlePartitionIdSet, + tbl.schemaView(), + lwm + ) + ); + + preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, parameters.onRecovery()); + }), ioExecutor) + // If the table is already closed, it's not a problem (probably the node is stopping). + .exceptionally(ignoreTableClosedException()); + })) + .toArray(CompletableFuture[]::new); - return allOf(futures).thenAccept(unused -> zoneLock.unlockRead(stamp)); + return allOf(futures).thenAccept(unused -> zoneLock.unlockRead(stamp)); Review Comment: If any of `futures` completes exceptionally, `thenAccept()` will not be called, so the lock will remain forever. Should we switch to `whenComplete()`? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -764,22 +768,26 @@ private CompletableFuture<Boolean> onZoneReplicaStopped(LocalPartitionReplicaEve ZonePartitionId zonePartitionId = parameters.zonePartitionId(); - StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new StampedLock()); + NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent( + zonePartitionId.zoneId(), + id -> new NaiveAsyncReadWriteLock()); - long stamp = zoneLock.readLock(); + CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock(); try { - CompletableFuture<?>[] futures = zoneTablesRawSet(zonePartitionId.zoneId()).stream() - .map(this::stopTablePartitions) - .toArray(CompletableFuture[]::new); + return readLockAcquisitionFuture.thenCompose(stamp -> { + CompletableFuture<?>[] futures = zoneTablesRawSet(zonePartitionId.zoneId()).stream() + .map(this::stopTablePartitions) + .toArray(CompletableFuture[]::new); - return allOf(futures).thenApply(unused -> { - zoneLock.unlockRead(stamp); + return allOf(futures).thenApply(unused -> { Review Comment: Same thing about unlocking ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -792,28 +800,32 @@ private CompletableFuture<Boolean> onZoneReplicaDestroyed(LocalPartitionReplicaE ZonePartitionId zonePartitionId = parameters.zonePartitionId(); - StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new StampedLock()); + NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent( + zonePartitionId.zoneId(), + id -> new NaiveAsyncReadWriteLock()); - long stamp = zoneLock.readLock(); + CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock(); try { - return inBusyLockAsync(busyLock, () -> { - CompletableFuture<?>[] futures = zoneTablesRawSet(zonePartitionId.zoneId()).stream() - .map(table -> supplyAsync( - () -> inBusyLockAsync( - busyLock, - () -> stopAndDestroyTablePartition( - new TablePartitionId(table.tableId(), zonePartitionId.partitionId()), - parameters.causalityToken() - ) - ), - ioExecutor)) - .toArray(CompletableFuture[]::new); + return readLockAcquisitionFuture.thenCompose(stamp -> { + return inBusyLockAsync(busyLock, () -> { + CompletableFuture<?>[] futures = zoneTablesRawSet(zonePartitionId.zoneId()).stream() + .map(table -> supplyAsync( + () -> inBusyLockAsync( + busyLock, + () -> stopAndDestroyTablePartition( + new TablePartitionId(table.tableId(), zonePartitionId.partitionId()), + parameters.causalityToken() + ) + ), + ioExecutor)) + .toArray(CompletableFuture[]::new); - return allOf(futures); - }).thenApply((unused) -> false); + return allOf(futures).thenAccept(unused -> zoneLock.unlockRead(stamp)); Review Comment: And here ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -3272,14 +3288,22 @@ public void setStreamerReceiverRunner(StreamerReceiverRunner runner) { * @return Set of tables. */ public Set<TableImpl> zoneTables(int zoneId) { - StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new StampedLock()); + NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock()); - long stamp = zoneLock.readLock(); + CompletableFuture<Long> readLockAcquisitionFuture = zoneLock.readLock(); try { - return Set.copyOf(zoneTablesRawSet(zoneId)); - } finally { - zoneLock.unlockRead(stamp); + return readLockAcquisitionFuture.thenApply(stamp -> { + Set<TableImpl> res = Set.copyOf(zoneTablesRawSet(zoneId)); + + zoneLock.unlockRead(stamp); + + return res; + }).join(); Review Comment: This might block the calling thread for a long time (while write lock is held by someone). Why do we even need a lock here? Would a thread-safe map be enough (without any lock)? Calls of this method do not need to be linearized with the internal machinery ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java: ########## @@ -3301,21 +3325,28 @@ private Set<TableImpl> zoneTablesRawSet(int zoneId) { * @param table Table to add. */ private void addTableToZone(int zoneId, TableImpl table) { - StampedLock rwLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new StampedLock()); + NaiveAsyncReadWriteLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> new NaiveAsyncReadWriteLock()); + + CompletableFuture<Long> writeLockAcquisitionFuture = zoneLock.writeLock(); - long stamp = rwLock.writeLock(); try { - tablesPerZone.compute(zoneId, (id, tables) -> { - if (tables == null) { - tables = new HashSet<>(); - } + writeLockAcquisitionFuture.thenAccept(stamp -> { + tablesPerZone.compute(zoneId, (id, tables) -> { + if (tables == null) { + tables = new HashSet<>(); + } - tables.add(table); + tables.add(table); - return tables; - }); - } finally { - rwLock.unlockWrite(stamp); + return tables; + }); + + zoneLock.unlockWrite(stamp); + }).join(); Review Comment: Same note about blocking the calling thread. Do we really need it? If yes, this looks very suspicious, it smells like a possible deadlock -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: notifications-unsubscr...@ignite.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org