vldpyatkov commented on code in PR #4540: URL: https://github.com/apache/ignite-3/pull/4540#discussion_r1806686767
########## modules/core/src/main/java/org/apache/ignite/internal/util/StripedCompositeReadWriteLock.java: ########## @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.util; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +/** + * ReadWriteLock with striping mechanics. Compared to {@link ReentrantReadWriteLock} it has slightly improved performance of + * {@link ReadWriteLock#readLock()} operations at the cost of {@link ReadWriteLock#writeLock()} operations and memory consumption. It also + * supports reentrancy semantics like {@link ReentrantReadWriteLock}. + */ +public class StripedCompositeReadWriteLock implements ReadWriteLock { + /** Index generator. */ + private static final AtomicInteger IDX_GEN = new AtomicInteger(); + + /** Index. */ + private static final ThreadLocal<Integer> IDX = ThreadLocal.withInitial(() -> IDX_GEN.incrementAndGet()); + + /** Locks. */ + private final ReentrantReadWriteLock[] locks; + + /** Composite write lock. */ + private final WriteLock writeLock; + + /** + * Creates a new instance with given concurrency level. + * + * @param concurrencyLvl Number of internal read locks. + */ + public StripedCompositeReadWriteLock(int concurrencyLvl) { + locks = new ReadLock[concurrencyLvl]; + + for (int i = 0; i < concurrencyLvl; i++) { + locks[i] = new ReadLock(); + } + + writeLock = new WriteLock(); + } + + /** + * Gets current index. + * + * @return Index of current thread stripe. + */ + private int curIdx() { + int idx = IDX.get(); + + return idx % locks.length; + } + + /** {@inheritDoc} */ + @Override + public Lock readLock() { + return locks[curIdx()].readLock(); + } + + /** + * Get a lock by stripe. + * + * @param idx Stripe index. + * @return The lock. + */ + public Lock readLock(int idx) { + return locks[idx].readLock(); + } + + /** {@inheritDoc} */ + @Override + public Lock writeLock() { + return writeLock; + } + + /** + * Queries if the write lock is held by the current thread. + * + * @return {@code true} if the current thread holds the write lock and {@code false} otherwise + */ + public boolean isWriteLockedByCurrentThread() { + return locks[locks.length - 1].isWriteLockedByCurrentThread(); + } + + /** + * Queries the number of reentrant read holds on this lock by the current thread. A reader thread has a hold on a lock for each lock + * action that is not matched by an unlock action. + * + * @return the number of holds on the read lock by the current thread, or zero if the read lock is not held by the current thread + */ + public int getReadHoldCount() { + return locks[curIdx()].getReadHoldCount(); + } + + /** + * Read lock. + */ + @SuppressWarnings("unused") + private static class ReadLock extends ReentrantReadWriteLock { + private long p0; Review Comment: For what are these fields needed? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/PartitionListener.java: ########## @@ -282,6 +279,10 @@ private void onWriteBusy(Iterator<CommandClosure<WriteCommand>> iterator) { updateTrackerIgnoringTrackerClosedException(safeTime, safeTimePropagatingCommand.safeTime()); } + // Completing the closure out of the partition snapshots lock to reduce possibility of deadlocks as it might + // trigger other actions taking same locks. + clo.result(result); Review Comment: I am not sure about that, because that is not clear which lock is mentioned. @rpuch Could you say your thought? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java: ########## @@ -3335,7 +3323,7 @@ private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForUp * @return Future completes with tuple {@link RowId} and collection of {@link Lock}. */ private CompletableFuture<IgniteBiTuple<RowId, Collection<Lock>>> takeLocksForInsert(BinaryRow binaryRow, RowId rowId, UUID txId) { - return lockManager.acquire(txId, new LockKey(tableId()), LockMode.IX) // IX lock on table + return lockManager.acquire(txId, new LockKey(replicationGroupId), LockMode.IX) Review Comment: Why did we change tableId to replicationGroupId in all places where the loca key is used with single parameters, but not do that for the two parametrazed constructors? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
