tkalkirill commented on code in PR #1666:
URL: https://github.com/apache/ignite-3/pull/1666#discussion_r1115447666


##########
modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java:
##########
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.util;
+
+import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.function.Function.identity;
+import static java.util.stream.Collectors.toList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.atomic.AtomicReferenceArray;
+import java.util.function.Function;
+import java.util.function.IntFunction;
+import java.util.stream.IntStream;
+import org.apache.ignite.internal.schema.configuration.TableView;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.StorageRebalanceException;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.AbortRebalanceStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.CleanupStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.CreateStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.DestroyStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.FinishRebalanceStorageOperation;
+import 
org.apache.ignite.internal.storage.util.StorageOperation.StartRebalanceStorageOperation;
+import org.apache.ignite.lang.IgniteStringFormatter;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Class for storing stores and performing operations on them.
+ */
+public class MvPartitionStorages<T extends MvPartitionStorage> {
+    private final TableView tableView;
+
+    private final AtomicReferenceArray<T> storageByPartitionId;
+
+    private final ConcurrentMap<Integer, StorageOperation> 
operationByPartitionId = new ConcurrentHashMap<>();
+
+    private final ConcurrentMap<Integer, CompletableFuture<Void>> 
rebalaceFutureByPartitionId = new ConcurrentHashMap<>();
+
+    /**
+     * Constructor.
+     *
+     * @param tableView Table configuration.
+     */
+    public MvPartitionStorages(TableView tableView) {
+        this.tableView = tableView;
+
+        storageByPartitionId = new 
AtomicReferenceArray<>(tableView.partitions());
+    }
+
+    /**
+     * Returns the multi-versioned partition storage, {@code null} if the 
storage does not exist (not created or destroyed).
+     *
+     * @throws IllegalArgumentException If partition ID is out of configured 
bounds.
+     */
+    public @Nullable T get(int partitionId) {
+        checkPartitionId(partitionId);
+
+        return storageByPartitionId.get(partitionId);
+    }
+
+    /**
+     * Creates and adds a new multi-versioned partition storage, if the 
storage is in the process of being destroyed, it will be recreated
+     * after the destruction.
+     *
+     * @param partitionId Partition ID.
+     * @param createStorageFunction Storage creation function, the argument is 
the partition ID.
+     * @return Future of creating a multi-versioned partition storage.
+     * @throws IllegalArgumentException If partition ID is out of configured 
bounds.
+     * @throws StorageException If the storage already exists or another 
operation is already in progress.
+     * @throws StorageException If the creation of the storage after its 
destruction is already planned.
+     */
+    public CompletableFuture<MvPartitionStorage> create(int partitionId, 
IntFunction<T> createStorageFunction) {
+        StorageOperation storageOperation = 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
+            if (operation instanceof DestroyStorageOperation) {
+                if (!((DestroyStorageOperation) 
operation).setCreationOperation(new CreateStorageOperation())) {
+                    throw new StorageException(
+                            "Creation of the storage after its destruction is 
already planned: [" + createStorageInfo(partitionId) + ']'
+                    );
+                }
+
+                return operation;
+            }
+
+            if (get(partitionId) != null) {
+                throw new StorageException("Storage already exists: [" + 
createStorageInfo(partitionId) + ']');
+            }
+
+            throwExceptionDependingOnOperation(operation, partitionId);
+
+            return new CreateStorageOperation();
+        });
+
+        CompletableFuture<Void> destroyStorageFuture = storageOperation 
instanceof DestroyStorageOperation
+                ? ((DestroyStorageOperation) 
storageOperation).getDestroyFuture()
+                : completedFuture(null);
+
+        return destroyStorageFuture.thenApply(unused -> {
+            T newStorage = createStorageFunction.apply(partitionId);
+
+            boolean set = storageByPartitionId.compareAndSet(partitionId, 
null, newStorage);
+
+            assert set : createStorageInfo(partitionId);
+
+            return (MvPartitionStorage) newStorage;
+        }).whenComplete((storage, throwable) -> 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
+            assert operation instanceof CreateStorageOperation : 
createStorageInfo(partitionId) + ", op=" + operation;
+
+            return null;
+        }));
+    }
+
+    /**
+     * Destroys a multi-versioned partition storage.
+     *
+     * @param partitionId Partition ID.
+     * @param destroyStorageFunction Partition destruction function.
+     * @return Future of multi-versioned partition storage destruction.
+     * @throws IllegalArgumentException If partition ID is out of configured 
bounds.
+     * @throws StorageException If the storage does not exist or another 
operation is already in progress.
+     * @throws StorageRebalanceException If the storage is in the process of 
rebalancing.
+     */
+    public CompletableFuture<Void> destroy(int partitionId, Function<T, 
CompletableFuture<Void>> destroyStorageFunction) {
+        DestroyStorageOperation destroyOp = (DestroyStorageOperation) 
operationByPartitionId.compute(partitionId, (partId, operation) -> {
+            checkStorageExists(partitionId);
+
+            throwExceptionDependingOnOperation(operation, partitionId);
+
+            return new DestroyStorageOperation();
+        });
+
+        return completedFuture(null)

Review Comment:
   This is a trick to make sure the operation gets rid of if it succeeds or 
fails.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to