tkalkirill commented on code in PR #1666: URL: https://github.com/apache/ignite-3/pull/1666#discussion_r1115450433
########## modules/storage-api/src/main/java/org/apache/ignite/internal/storage/util/MvPartitionStorages.java: ########## @@ -0,0 +1,495 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage.util; + +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.function.Function.identity; +import static java.util.stream.Collectors.toList; + +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicReferenceArray; +import java.util.function.Function; +import java.util.function.IntFunction; +import java.util.stream.IntStream; +import org.apache.ignite.internal.schema.configuration.TableView; +import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.StorageException; +import org.apache.ignite.internal.storage.StorageRebalanceException; +import org.apache.ignite.internal.storage.util.StorageOperation.AbortRebalanceStorageOperation; +import org.apache.ignite.internal.storage.util.StorageOperation.CleanupStorageOperation; +import org.apache.ignite.internal.storage.util.StorageOperation.CreateStorageOperation; +import org.apache.ignite.internal.storage.util.StorageOperation.DestroyStorageOperation; +import org.apache.ignite.internal.storage.util.StorageOperation.FinishRebalanceStorageOperation; +import org.apache.ignite.internal.storage.util.StorageOperation.StartRebalanceStorageOperation; +import org.apache.ignite.lang.IgniteStringFormatter; +import org.jetbrains.annotations.Nullable; + +/** + * Class for storing stores and performing operations on them. + */ +public class MvPartitionStorages<T extends MvPartitionStorage> { + private final TableView tableView; + + private final AtomicReferenceArray<T> storageByPartitionId; + + private final ConcurrentMap<Integer, StorageOperation> operationByPartitionId = new ConcurrentHashMap<>(); + + private final ConcurrentMap<Integer, CompletableFuture<Void>> rebalaceFutureByPartitionId = new ConcurrentHashMap<>(); + + /** + * Constructor. + * + * @param tableView Table configuration. + */ + public MvPartitionStorages(TableView tableView) { + this.tableView = tableView; + + storageByPartitionId = new AtomicReferenceArray<>(tableView.partitions()); + } + + /** + * Returns the multi-versioned partition storage, {@code null} if the storage does not exist (not created or destroyed). + * + * @throws IllegalArgumentException If partition ID is out of configured bounds. + */ + public @Nullable T get(int partitionId) { + checkPartitionId(partitionId); + + return storageByPartitionId.get(partitionId); + } + + /** + * Creates and adds a new multi-versioned partition storage, if the storage is in the process of being destroyed, it will be recreated + * after the destruction. + * + * @param partitionId Partition ID. + * @param createStorageFunction Storage creation function, the argument is the partition ID. + * @return Future of creating a multi-versioned partition storage. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageException If the storage already exists or another operation is already in progress. + * @throws StorageException If the creation of the storage after its destruction is already planned. + */ + public CompletableFuture<MvPartitionStorage> create(int partitionId, IntFunction<T> createStorageFunction) { + StorageOperation storageOperation = operationByPartitionId.compute(partitionId, (partId, operation) -> { + if (operation instanceof DestroyStorageOperation) { + if (!((DestroyStorageOperation) operation).setCreationOperation(new CreateStorageOperation())) { + throw new StorageException( + "Creation of the storage after its destruction is already planned: [" + createStorageInfo(partitionId) + ']' + ); + } + + return operation; + } + + if (get(partitionId) != null) { + throw new StorageException("Storage already exists: [" + createStorageInfo(partitionId) + ']'); + } + + throwExceptionDependingOnOperation(operation, partitionId); + + return new CreateStorageOperation(); + }); + + CompletableFuture<Void> destroyStorageFuture = storageOperation instanceof DestroyStorageOperation + ? ((DestroyStorageOperation) storageOperation).getDestroyFuture() + : completedFuture(null); + + return destroyStorageFuture.thenApply(unused -> { + T newStorage = createStorageFunction.apply(partitionId); + + boolean set = storageByPartitionId.compareAndSet(partitionId, null, newStorage); + + assert set : createStorageInfo(partitionId); + + return (MvPartitionStorage) newStorage; + }).whenComplete((storage, throwable) -> operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof CreateStorageOperation : createStorageInfo(partitionId) + ", op=" + operation; + + return null; + })); + } + + /** + * Destroys a multi-versioned partition storage. + * + * @param partitionId Partition ID. + * @param destroyStorageFunction Partition destruction function. + * @return Future of multi-versioned partition storage destruction. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageException If the storage does not exist or another operation is already in progress. + * @throws StorageRebalanceException If the storage is in the process of rebalancing. + */ + public CompletableFuture<Void> destroy(int partitionId, Function<T, CompletableFuture<Void>> destroyStorageFunction) { + DestroyStorageOperation destroyOp = (DestroyStorageOperation) operationByPartitionId.compute(partitionId, (partId, operation) -> { + checkStorageExists(partitionId); + + throwExceptionDependingOnOperation(operation, partitionId); + + return new DestroyStorageOperation(); + }); + + return completedFuture(null) + .thenCompose(unused -> destroyStorageFunction.apply(storageByPartitionId.getAndSet(partitionId, null))) + .whenComplete((unused, throwable) -> { + operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof DestroyStorageOperation : createStorageInfo(partitionId) + ", op=" + operation; + + DestroyStorageOperation destroyStorageOperation = (DestroyStorageOperation) operation; + + return destroyStorageOperation.getCreateStorageOperation(); + }); + + if (throwable == null) { + destroyOp.getDestroyFuture().complete(null); + } else { + destroyOp.getDestroyFuture().completeExceptionally(throwable); + } + }); + } + + /** + * Clears a multi-versioned partition storage. + * + * @param partitionId Partition ID. + * @param clearStorageFunction Partition clean up function. + * @return Future of cleaning a multi-versioned partition storage. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageException If the storage does not exist or another operation is already in progress. + * @throws StorageRebalanceException If the storage is in the process of rebalancing. + */ + public CompletableFuture<Void> clear(int partitionId, Function<T, CompletableFuture<Void>> clearStorageFunction) { + operationByPartitionId.compute(partitionId, (partId, operation) -> { + checkStorageExists(partitionId); + + throwExceptionDependingOnOperation(operation, partitionId); + + return new CleanupStorageOperation(); + }); + + return completedFuture(null) + .thenCompose(unused -> clearStorageFunction.apply(get(partitionId))) + .whenComplete((unused, throwable) -> + operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof CleanupStorageOperation : createStorageInfo(partitionId) + ", op=" + operation; + + return null; + }) + ); + } + + /** + * Starts a multi-versioned partition storage rebalance. + * + * @param partitionId Partition ID. + * @param startRebalanceStorageFunction Partition start rebalance function. + * @return Future of starting rebalance a multi-versioned partition storage. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageRebalanceException If the storage does not exist or another operation is already in progress. + * @throws StorageRebalanceException If rebalancing is already in progress. + */ + public CompletableFuture<Void> startRebalace(int partitionId, Function<T, CompletableFuture<Void>> startRebalanceStorageFunction) { + operationByPartitionId.compute(partitionId, (partId, operation) -> { + checkStorageExistsForRebalance(partitionId); + + throwExceptionDependingOnOperationForRebalance(operation, partitionId); + + if (rebalaceFutureByPartitionId.containsKey(partitionId)) { + throw new StorageRebalanceException(createStorageInProgressOfRebalanceErrorMessage(partitionId)); + } + + return new StartRebalanceStorageOperation(); + }); + + return completedFuture(null) + .thenCompose(unused -> { + CompletableFuture<Void> startRebalanceFuture = startRebalanceStorageFunction.apply(get(partitionId)); + + CompletableFuture<Void> old = rebalaceFutureByPartitionId.put(partitionId, startRebalanceFuture); + + assert old == null : createStorageInfo(partitionId); + + return startRebalanceFuture; + }).whenComplete((unused, throwable) -> + operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof StartRebalanceStorageOperation : + createStorageInfo(partitionId) + ", op=" + operation; + + return null; + }) + ); + } + + /** + * Aborts a multi-versioned partition storage rebalance if started (successful or not). + * + * @param partitionId Partition ID. + * @param abortRebalanceStorageFunction Partition abort rebalance function. + * @return Future of aborting rebalance a multi-versioned partition storage. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageRebalanceException If the storage does not exist or another operation is already in progress. + */ + public CompletableFuture<Void> abortRebalance(int partitionId, Function<T, CompletableFuture<Void>> abortRebalanceStorageFunction) { + operationByPartitionId.compute(partitionId, (partId, operation) -> { + checkStorageExistsForRebalance(partitionId); + + throwExceptionDependingOnOperationForRebalance(operation, partitionId); + + return new AbortRebalanceStorageOperation(); + }); + + return completedFuture(null) + .thenCompose(unused -> { + CompletableFuture<Void> rebalanceFuture = rebalaceFutureByPartitionId.remove(partitionId); + + if (rebalanceFuture == null) { + return completedFuture(null); + } + + return rebalanceFuture + .handle((unused1, throwable) -> abortRebalanceStorageFunction.apply(get(partitionId))) + .thenCompose(identity()); + }).whenComplete((unused, throwable) -> + operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof AbortRebalanceStorageOperation : + createStorageInfo(partitionId) + ", op=" + operation; + + return null; + }) + ); + } + + /** + * Finishes a successful started multi-versioned partition storage rebalance. + * + * @param partitionId Partition ID. + * @param finishRebalanceStorageFunction Partition finish rebalance function, the argument is the partition ID. + * @return Future of aborting rebalance a multi-versioned partition storage. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageRebalanceException If the storage does not exist or another operation is already in progress. + * @throws StorageRebalanceException If storage rebalancing has not started. + */ + public CompletableFuture<Void> finishRebalance(int partitionId, Function<T, CompletableFuture<Void>> finishRebalanceStorageFunction) { + operationByPartitionId.compute(partitionId, (partId, operation) -> { + checkStorageExistsForRebalance(partitionId); + + throwExceptionDependingOnOperationForRebalance(operation, partitionId); + + if (!rebalaceFutureByPartitionId.containsKey(partitionId)) { + throw new StorageRebalanceException("Storage rebalancing did not start: [" + createStorageInfo(partitionId) + ']'); + } + + return new FinishRebalanceStorageOperation(); + }); + + return completedFuture(null) + .thenCompose(unused -> { + CompletableFuture<Void> rebalanceFuture = rebalaceFutureByPartitionId.remove(partitionId); + + assert rebalanceFuture != null : createStorageInfo(partitionId); + + return rebalanceFuture.thenCompose(unused1 -> finishRebalanceStorageFunction.apply(get(partitionId))); + }).whenComplete((unused, throwable) -> + operationByPartitionId.compute(partitionId, (partId, operation) -> { + assert operation instanceof FinishRebalanceStorageOperation : + createStorageInfo(partitionId) + ", op=" + operation; + + return null; + }) + ); + } + + /** + * Collects all multi-versioned partition storages to close. + */ + // TODO: IGNITE-18529 We need to wait for all current operations and disable new ones + public List<T> getAllForClose() { + return IntStream.range(0, storageByPartitionId.length()) + .mapToObj(partitionId -> storageByPartitionId.getAndSet(partitionId, null)) + .filter(Objects::nonNull) + .collect(toList()); + } + + /** + * Destroys all created multi-versioned partition storages. + * + * @param destroyStorageFunction Partition destruction function. + * @return Future destruction of all created multi-versioned partition storages. + */ + // TODO: IGNITE-18529 We need to deal with parallel operations + public CompletableFuture<Void> destroyAll(Function<T, CompletableFuture<Void>> destroyStorageFunction) { + List<CompletableFuture<Void>> destroyFutures = new ArrayList<>(); + + for (int partitionId = 0; partitionId < storageByPartitionId.length(); partitionId++) { + StorageOperation storageOperation = operationByPartitionId.get(partitionId); + + if (storageOperation instanceof DestroyStorageOperation) { + destroyFutures.add(((DestroyStorageOperation) storageOperation).getDestroyFuture()); + } else { + T storage = storageByPartitionId.getAndSet(partitionId, null); + + if (storage != null) { + destroyFutures.add(destroyStorageFunction.apply(storage)); + } + } + } + + return CompletableFuture.allOf(destroyFutures.toArray(CompletableFuture[]::new)); + } + + /** + * Returns table name. + */ + public String getTableName() { + return tableView.name(); + } + + /** + * Creates a short info of the multi-versioned partition storage in the format "table=user, partitionId=1". + * + * @param partitionId Partition ID. + */ + public String createStorageInfo(int partitionId) { + return IgniteStringFormatter.format("table={}, partitionId={}", getTableName(), partitionId); + } + + /** + * Checks that the partition ID is within the scope of the configuration. + * + * @param partitionId Partition ID. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + */ + private void checkPartitionId(int partitionId) { + int partitions = storageByPartitionId.length(); + + if (partitionId < 0 || partitionId >= partitions) { + throw new IllegalArgumentException(IgniteStringFormatter.format( + "Unable to access partition with id outside of configured range: [table={}, partitionId={}, partitions={}]", + getTableName(), + partitionId, + partitions + )); + } + } + + /** + * Checks if the storage exists. + * + * @param partitionId Partition ID. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageException If the storage does not exist. + */ + private void checkStorageExists(int partitionId) { + if (get(partitionId) == null) { + throw new StorageException(createStorageDoesNotExistErrorMessage(partitionId)); + } + } + + /** + * Checks if the storage exists. + * + * @param partitionId Partition ID. + * @throws IllegalArgumentException If partition ID is out of configured bounds. + * @throws StorageRebalanceException If the storage does not exist. + */ + private void checkStorageExistsForRebalance(int partitionId) { + if (get(partitionId) == null) { + throw new StorageRebalanceException(createStorageDoesNotExistErrorMessage(partitionId)); + } + } + + private void throwExceptionDependingOnOperation(StorageOperation operation, int partitionId) { + if (operation == null) { Review Comment: I agree, I'll fix it. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
