ibessonov commented on code in PR #1226: URL: https://github.com/apache/ignite-3/pull/1226#discussion_r1003201354
########## modules/core/src/main/java/org/apache/ignite/internal/lock/ReusableLockLockup.java: ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.lock; + +import java.util.concurrent.locks.Lock; + +/** + * {@link AutoLockup} around a {@link Lock} that is reused across lock acquire events. This means that each acquire + * returns this instance, so the callers (that acquire and then own the underlying lock for some time) invoke {@link AutoLockup#close()}. + * on same instance. + * + * <p>To acquire the lock, {@link #acquireLock()} needs to be called. + */ +public class ReusableLockLockup implements AutoLockup { Review Comment: I look at this class and I don't understand it. The trick is that `acquireLock()` returns `this`, and this way we what, save allocations? Why does this particular class have to inherit `AutoLockup`? It shouldn't, but we do this for the `return this` trick. Private inheritance would fit perfectly, but it doesn't exist in Java... The more I think about it, the more broken this approach appears. First of all, what's closed - cannot be reopened, according to the way Closeables are used. This part of the contract is violated. Technically, you should change the way you use closeables in context of locks. Current implementation is kinda messy Second, close must be idempotent. If someone calls it twice, it shouldn't throw an exception that _"you bad boy don't hold this lock, get out"_, it should just ignore the action. This part of the contract is also violated. ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionKey.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.Objects; +import java.util.UUID; +import org.apache.ignite.internal.tostring.S; + +/** + * Uniquely identifies a partition. This is a pair of internal table ID and partition number (aka partition ID). + */ +public class PartitionKey { + private final UUID tableId; Review Comment: I see no getter for a table id, why is it absent? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/PartitionAccessImpl.java: ########## @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.table.distributed.raft.snapshot; + +import java.util.ArrayList; +import java.util.List; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; +import org.apache.ignite.internal.storage.RowId; +import org.apache.ignite.internal.util.Cursor; +import org.jetbrains.annotations.Nullable; + +/** + * {@link PartitionAccess} that adapts an {@link MvPartitionStorage}. + */ +public class PartitionAccessImpl implements PartitionAccess { + private final PartitionKey partitionKey; + private final MvPartitionStorage partitionStorage; + + public PartitionAccessImpl(PartitionKey partitionKey, MvPartitionStorage partitionStorage) { + this.partitionKey = partitionKey; + this.partitionStorage = partitionStorage; + } + + @Override + public PartitionKey key() { + return partitionKey; + } + + @Override + public long persistedIndex() { + return partitionStorage.persistedIndex(); + } + + @Override + public @Nullable RowId closestRowId(RowId lowerBound) { + return partitionStorage.closestRowId(lowerBound); + } + + @Override + public List<ReadResult> rowVersions(RowId rowId) { Review Comment: I wonder why you don't just return cursor here and convert it to list later ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -141,8 +201,48 @@ private CompletableFuture<Void> respond( NetworkAddress sender, Long correlationId ) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 // Handle offline sender and stopped manager. return messagingService.respond(sender, response, correlationId); } + + @Override + public PartitionSnapshots partitionSnapshots(PartitionKey partitionKey) { + return getPartitionSnapshots(partitionKey); + } + + private static class PartitionSnapshotsImpl implements PartitionSnapshots { + private final List<OutgoingSnapshot> snapshots = new ArrayList<>(); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final ReusableLockLockup readLockLockup = new ReusableLockLockup(lock.readLock()); + + private void addUnderLock(OutgoingSnapshot snapshot) { + lock.writeLock().lock(); + + try { + snapshots.add(snapshot); + } finally { + lock.writeLock().unlock(); + } + } + + private void removeUnderLock(OutgoingSnapshot snapshot) { + try { Review Comment: Where is `lock.writeLock().lock();`? I don't see it. I guess you don't test this one ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -65,41 +91,69 @@ public MessagingService messagingService() { @Override public void start() { - messagingService.addMessageHandler(TableMessageGroup.class, this::messageHandler); + executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("outgoing-snapshots", LOG)); Review Comment: You could have instantiated it in the constructor and make it final. Anyway, why `4`? And if you will leave it here, why volatile? ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -65,41 +91,69 @@ public MessagingService messagingService() { @Override public void start() { - messagingService.addMessageHandler(TableMessageGroup.class, this::messageHandler); + executor = Executors.newFixedThreadPool(4, new NamedThreadFactory("outgoing-snapshots", LOG)); + + messagingService.addMessageHandler(TableMessageGroup.class, this::handleMessage); } @Override public void stop() throws Exception { + IgniteUtils.shutdownAndAwaitTermination(executor, 10, TimeUnit.SECONDS); } /** * Registers an outgoing snapshot in the manager. * - * @param snapshotId Snapshot id. + * @param snapshotId Snapshot id. * @param outgoingSnapshot Outgoing snapshot. */ - void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot outgoingSnapshot) { - outgoingSnapshots.put(snapshotId, outgoingSnapshot); + @Override + public void registerOutgoingSnapshot(UUID snapshotId, OutgoingSnapshot outgoingSnapshot) { + synchronized (snapshotsLock) { Review Comment: Why do you need this lock? There's won't be two snapshots with the same id, and you don't need atomicity between updating two collections ########## modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/outgoing/OutgoingSnapshotsManager.java: ########## @@ -141,8 +201,48 @@ private CompletableFuture<Void> respond( NetworkAddress sender, Long correlationId ) { - //TODO https://issues.apache.org/jira/browse/IGNITE-17262 + //TODO https://issues.apache.org/jira/browse/IGNITE-17935 // Handle offline sender and stopped manager. return messagingService.respond(sender, response, correlationId); } + + @Override + public PartitionSnapshots partitionSnapshots(PartitionKey partitionKey) { Review Comment: It is public and it has a lazy-init. Is there a chance of polluting memory empty snapshot sets? Can't we just "get"? Or it shouldn't be nullable in your design? Who cleans it then? I feel a memory leak -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
