tkalkirill commented on code in PR #3216:
URL: https://github.com/apache/ignite-3/pull/3216#discussion_r1490945015
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java:
##########
@@ -83,7 +83,7 @@ public CatalogHashIndexDescriptor(
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
* @param status Index status.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
Same
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskScheduler.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
+import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Component is responsible for starting and stopping {@link
ChangeIndexStatusTask}. */
+class ChangeIndexStatusTaskScheduler implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(ChangeIndexStatusTaskScheduler.class);
+
+ private final CatalogManager catalogManager;
+
+ private final ClusterService clusterService;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final HybridClock clock;
+
+ private final ClockWaiter clockWaiter;
+
+ private final PlacementDriver placementDriver;
+
+ private final Executor executor;
+
+ private final Map<ChangeIndexStatusTaskId, ChangeIndexStatusTask> taskById
= new ConcurrentHashMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ ChangeIndexStatusTaskScheduler(
+ CatalogManager catalogManager,
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ HybridClock clock,
+ ClockWaiter clockWaiter,
+ PlacementDriver placementDriver,
+ Executor executor
+ ) {
+ this.catalogManager = catalogManager;
+ this.clusterService = clusterService;
+ this.logicalTopologyService = logicalTopologyService;
+ this.clock = clock;
+ this.clockWaiter = clockWaiter;
+ this.placementDriver = placementDriver;
+ this.executor = executor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ stopAllTasks();
+ }
+
+ /**
+ * Schedules a task for that will transfer the given index to the {@link
CatalogIndexStatus#BUILDING} state.
+ */
+ void scheduleStartBuildingTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.REGISTERED;
+
+ LOG.info("Scheduling starting of index building. Index: {}",
indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, startBuildingIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask
startBuildingIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ return new ChangeIndexStatusTask(
+ indexDescriptor,
+ catalogManager,
+ placementDriver,
+ clusterService,
+ logicalTopologyService,
+ clock,
+ clockWaiter,
+ executor,
+ busyLock
+ ) {
+ @Override
+ CatalogCommand switchIndexStatusCommand() {
+ return
StartBuildingIndexCommand.builder().indexId(indexDescriptor.id()).build();
+ }
+ };
+ }
+
+ /**
+ * Schedules a task for that will remove a given index from the Catalog.
+ */
+ void scheduleRemoveIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.STOPPING;
+
+ LOG.info("Scheduling index removal. Index: {}", indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, removeIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask removeIndexTask(CatalogIndexDescriptor
indexDescriptor) {
+ return new ChangeIndexStatusTask(
+ indexDescriptor,
+ catalogManager,
+ placementDriver,
+ clusterService,
+ logicalTopologyService,
+ clock,
+ clockWaiter,
+ executor,
+ busyLock
+ ) {
+ @Override
+ CatalogCommand switchIndexStatusCommand() {
+ return
RemoveIndexCommand.builder().indexId(indexDescriptor.id()).build();
+ }
+ };
+ }
+
+ private void scheduleTask(ChangeIndexStatusTaskId taskId,
ChangeIndexStatusTask task) {
+ // Check if the task has already been added before.
+ if (taskById.putIfAbsent(taskId, task) == null) {
+ task.start().whenComplete((unused, throwable) ->
taskById.remove(taskId));
+ } else {
+ LOG.info("Skipping task scheduling, because a task with the same
ID is already running.");
Review Comment:
I would take it to the debug level.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java:
##########
@@ -62,9 +62,9 @@ public CatalogIndexStatus status() {
return status;
}
- /** Returns catalog version in which the index was created. */
- public int creationCatalogVersion() {
- return creationCatalogVersion;
+ /** Returns the Catalog version of the last index status change. */
Review Comment:
Same
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final ChangeIndexStatusTaskScheduler
changeIndexStatusTaskScheduler;
+
+ /** Tables IDs for which the local node is the primary replica for the
partition with ID {@code 0}. */
+ private final Set<Integer> localNodeIsPrimaryReplicaForTableIds =
ConcurrentHashMap.newKeySet();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ IndexTaskManager(
+ CatalogManager catalogManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler
+ ) {
+ this.catalogService = catalogManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.changeIndexStatusTaskScheduler = changeIndexStatusTaskScheduler;
+
+ addListeners();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ changeIndexStatusTaskScheduler.close();
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_CREATE,
adaptToEventListener(this::onIndexCreated));
+
+ catalogService.listen(CatalogEvent.INDEX_STOPPING,
adaptToEventListener(this::onIndexDropped));
+
+ catalogService.listen(CatalogEvent.INDEX_REMOVED,
adaptToEventListener(this::onIndexRemoved));
+
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
adaptToEventListener(this::onPrimaryReplicaElected));
+ }
+
+ /** Wraps a given callback into an EventListener. */
+ private <T extends EventParameters> EventListener<T>
adaptToEventListener(Consumer<T> action) {
Review Comment:
Looks like a candidate for `IgniteUtils`.
##########
modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java:
##########
@@ -3795,7 +3795,7 @@ private CompletableFuture<Boolean>
onIndexBuilding(CatalogEventParameters parame
assert indexDescriptor != null : "indexId=" + indexId + ",
catalogVersion=" + parameters.catalogVersion();
if (indexDescriptor.tableId() == tableId()) {
-
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexDescriptor.creationCatalogVersion());
+
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexDescriptor.txWaitCatalogVersion());
Review Comment:
@rpuch Do you think it is necessary to update at `INDEX_REMOVED`? I think
yes.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskScheduler.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
+import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Component is responsible for starting and stopping {@link
ChangeIndexStatusTask}. */
+class ChangeIndexStatusTaskScheduler implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(ChangeIndexStatusTaskScheduler.class);
+
+ private final CatalogManager catalogManager;
+
+ private final ClusterService clusterService;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final HybridClock clock;
+
+ private final ClockWaiter clockWaiter;
+
+ private final PlacementDriver placementDriver;
+
+ private final Executor executor;
+
+ private final Map<ChangeIndexStatusTaskId, ChangeIndexStatusTask> taskById
= new ConcurrentHashMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ ChangeIndexStatusTaskScheduler(
+ CatalogManager catalogManager,
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ HybridClock clock,
+ ClockWaiter clockWaiter,
+ PlacementDriver placementDriver,
+ Executor executor
+ ) {
+ this.catalogManager = catalogManager;
+ this.clusterService = clusterService;
+ this.logicalTopologyService = logicalTopologyService;
+ this.clock = clock;
+ this.clockWaiter = clockWaiter;
+ this.placementDriver = placementDriver;
+ this.executor = executor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ stopAllTasks();
+ }
+
+ /**
+ * Schedules a task for that will transfer the given index to the {@link
CatalogIndexStatus#BUILDING} state.
+ */
+ void scheduleStartBuildingTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.REGISTERED;
+
+ LOG.info("Scheduling starting of index building. Index: {}",
indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, startBuildingIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask
startBuildingIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ return new ChangeIndexStatusTask(
+ indexDescriptor,
+ catalogManager,
+ placementDriver,
+ clusterService,
+ logicalTopologyService,
+ clock,
+ clockWaiter,
+ executor,
+ busyLock
+ ) {
+ @Override
+ CatalogCommand switchIndexStatusCommand() {
+ return
StartBuildingIndexCommand.builder().indexId(indexDescriptor.id()).build();
+ }
+ };
+ }
+
+ /**
+ * Schedules a task for that will remove a given index from the Catalog.
+ */
+ void scheduleRemoveIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.STOPPING;
+
+ LOG.info("Scheduling index removal. Index: {}", indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, removeIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask removeIndexTask(CatalogIndexDescriptor
indexDescriptor) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTask.java:
##########
@@ -149,28 +158,33 @@ CompletableFuture<Void> start() {
}
try {
- return supplyAsync(() ->
awaitActivateForCatalogVersionOfIndexCreation()
- .thenCompose(unused ->
ensureThatLocalNodeStillPrimaryReplica())
- .thenCompose(unused ->
inBusyLocks(logicalTopologyService::logicalTopologyOnLeader))
-
.thenComposeAsync(this::awaitFinishRwTxsBeforeCatalogVersionOfIndexCreation,
executor)
- .thenComposeAsync(unused -> switchIndexToBuildingStatus(),
executor), executor)
- .thenCompose(Function.identity())
- .handle((unused, throwable) -> {
+ LOG.info("Starting task to change index status. Index: {}",
indexDescriptor);
Review Comment:
I suggest a debug level.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTask.java:
##########
@@ -149,28 +158,33 @@ CompletableFuture<Void> start() {
}
try {
- return supplyAsync(() ->
awaitActivateForCatalogVersionOfIndexCreation()
- .thenCompose(unused ->
ensureThatLocalNodeStillPrimaryReplica())
- .thenCompose(unused ->
inBusyLocks(logicalTopologyService::logicalTopologyOnLeader))
-
.thenComposeAsync(this::awaitFinishRwTxsBeforeCatalogVersionOfIndexCreation,
executor)
- .thenComposeAsync(unused -> switchIndexToBuildingStatus(),
executor), executor)
- .thenCompose(Function.identity())
- .handle((unused, throwable) -> {
+ LOG.info("Starting task to change index status. Index: {}",
indexDescriptor);
+
+ return awaitCatalogVersionActivation()
Review Comment:
Please bring back the use of `supplyAsync` so that in the thread that will
call the code we do not start, for example, going to the network and so on.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSortedIndexDescriptor.java:
##########
@@ -67,7 +67,7 @@ public CatalogSortedIndexDescriptor(
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
* @param status Index status.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
Same
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
Review Comment:
I think the name `IndexChangeStatusController` is more suitable.
And for `IndexBuildingManager` the name is `IndexChangeStatusManager`.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSortedIndexDescriptor.java:
##########
@@ -44,7 +44,7 @@ public class CatalogSortedIndexDescriptor extends
CatalogIndexDescriptor {
* @param name Name of the index.
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
The name of the variable and its description are confusing to me.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java:
##########
@@ -43,12 +43,12 @@ public class CatalogHashIndexDescriptor extends
CatalogIndexDescriptor {
* @param name Name of the index.
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
The name of the variable and its description are confusing to me.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogHashIndexDescriptor.java:
##########
@@ -59,7 +59,7 @@ public CatalogHashIndexDescriptor(int id, String name, int
tableId, boolean uniq
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
* @param status Index status.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
Same
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java:
##########
@@ -31,20 +31,20 @@ public abstract class CatalogIndexDescriptor extends
CatalogObjectDescriptor {
/** Index status. */
private final CatalogIndexStatus status;
- /** Catalog version in which the index was created. */
- private final int creationCatalogVersion;
+ /** Catalog version of the last index status change. */
+ private final int txWaitCatalogVersion;
Review Comment:
The name of the variable and its description are confusing to me.
##########
modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java:
##########
@@ -115,6 +121,37 @@ void testBuildIndexOnStableTopology(int replicas) throws
Exception {
.check();
}
+ @Test
+ void testDropIndexDuringTransaction() throws Exception {
+ int partitions = initialNodes();
+
+ int replicas = initialNodes();
+
+ createAndPopulateTable(replicas, partitions);
+
+ createIndex(INDEX_NAME);
+
+ checkIndexBuild(partitions, replicas, INDEX_NAME);
+
+ IgniteImpl node = CLUSTER.aliveNode();
+
+ // Start a transaction. We expect that the index will not be removed
until this transaction completes.
+ Transaction tx = node.transactions().begin(new
TransactionOptions().readOnly(false));
+
+ dropIndex(INDEX_NAME);
+
+ CatalogService catalog = node.catalogManager();
+
+ CatalogIndexDescriptor indexDescriptor = catalog.index(INDEX_NAME,
node.clock().nowLong());
+
+ assertThat(indexDescriptor, is(notNullValue()));
+ assertThat(indexDescriptor.status(), is(CatalogIndexStatus.STOPPING));
+
+ tx.commit();
+
+ assertTrue(waitForCondition(() -> catalog.index(INDEX_NAME,
node.clock().nowLong()) == null, 10_000));
Review Comment:
Can we wait for event `INDEX_REMOVED`?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
Review Comment:
Isn't it necessary to stop the task for INDEX_BUILDING, similar to
INDEX_REMOVED?
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final ChangeIndexStatusTaskScheduler
changeIndexStatusTaskScheduler;
+
+ /** Tables IDs for which the local node is the primary replica for the
partition with ID {@code 0}. */
+ private final Set<Integer> localNodeIsPrimaryReplicaForTableIds =
ConcurrentHashMap.newKeySet();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ IndexTaskManager(
+ CatalogManager catalogManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler
+ ) {
+ this.catalogService = catalogManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.changeIndexStatusTaskScheduler = changeIndexStatusTaskScheduler;
+
+ addListeners();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ changeIndexStatusTaskScheduler.close();
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_CREATE,
adaptToEventListener(this::onIndexCreated));
+
+ catalogService.listen(CatalogEvent.INDEX_STOPPING,
adaptToEventListener(this::onIndexDropped));
+
+ catalogService.listen(CatalogEvent.INDEX_REMOVED,
adaptToEventListener(this::onIndexRemoved));
+
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
adaptToEventListener(this::onPrimaryReplicaElected));
+ }
+
+ /** Wraps a given callback into an EventListener. */
+ private <T extends EventParameters> EventListener<T>
adaptToEventListener(Consumer<T> action) {
+ return (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ inBusyLock(busyLock, () -> action.accept(parameters));
+
+ return falseCompletedFuture();
+ };
+ }
+
+ private void onIndexCreated(CreateIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor = parameters.indexDescriptor();
+
+ if
(localNodeIsPrimaryReplicaForTableIds.contains(indexDescriptor.tableId())) {
+ // Schedule building the index only if the local node is the
primary replica for the 0 partition of the table for which the
+ // index was created.
+
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(parameters.indexDescriptor());
+ }
+ }
+
+ private void onIndexDropped(StoppingIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor =
catalogService.index(parameters.indexId(), parameters.catalogVersion());
+
+ assert indexDescriptor != null : parameters.indexId();
+
+ if
(localNodeIsPrimaryReplicaForTableIds.contains(indexDescriptor.tableId())) {
+ // Schedule index removal only if the local node is the primary
replica for the 0 partition of the table for which the
+ // index was dropped.
+
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
+ }
+ }
+
+ private void onIndexRemoved(RemoveIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor =
catalogService.index(parameters.indexId(), parameters.catalogVersion() - 1);
+
+ assert indexDescriptor != null : parameters.indexId();
+
+ changeIndexStatusTaskScheduler.stopTask(indexDescriptor);
+ }
+
+ private void onPrimaryReplicaElected(PrimaryReplicaEventParameters
parameters) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final ChangeIndexStatusTaskScheduler
changeIndexStatusTaskScheduler;
+
+ /** Tables IDs for which the local node is the primary replica for the
partition with ID {@code 0}. */
+ private final Set<Integer> localNodeIsPrimaryReplicaForTableIds =
ConcurrentHashMap.newKeySet();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ IndexTaskManager(
+ CatalogManager catalogManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler
+ ) {
+ this.catalogService = catalogManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.changeIndexStatusTaskScheduler = changeIndexStatusTaskScheduler;
+
+ addListeners();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ changeIndexStatusTaskScheduler.close();
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_CREATE,
adaptToEventListener(this::onIndexCreated));
+
+ catalogService.listen(CatalogEvent.INDEX_STOPPING,
adaptToEventListener(this::onIndexDropped));
+
+ catalogService.listen(CatalogEvent.INDEX_REMOVED,
adaptToEventListener(this::onIndexRemoved));
+
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
adaptToEventListener(this::onPrimaryReplicaElected));
+ }
+
+ /** Wraps a given callback into an EventListener. */
+ private <T extends EventParameters> EventListener<T>
adaptToEventListener(Consumer<T> action) {
+ return (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ inBusyLock(busyLock, () -> action.accept(parameters));
+
+ return falseCompletedFuture();
+ };
+ }
+
+ private void onIndexCreated(CreateIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor = parameters.indexDescriptor();
+
+ if
(localNodeIsPrimaryReplicaForTableIds.contains(indexDescriptor.tableId())) {
+ // Schedule building the index only if the local node is the
primary replica for the 0 partition of the table for which the
+ // index was created.
+
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(parameters.indexDescriptor());
+ }
+ }
+
+ private void onIndexDropped(StoppingIndexEventParameters parameters) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSortedIndexDescriptor.java:
##########
@@ -91,7 +91,7 @@ public CatalogSortedIndexDescriptor(
* @param tableId Id of the table index belongs to.
* @param unique Unique flag.
* @param status Index status.
- * @param creationCatalogVersion Catalog version in which the index was
created.
+ * @param txWaitCatalogVersion Catalog version of the last index status
change.
Review Comment:
Same
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final ChangeIndexStatusTaskScheduler
changeIndexStatusTaskScheduler;
+
+ /** Tables IDs for which the local node is the primary replica for the
partition with ID {@code 0}. */
+ private final Set<Integer> localNodeIsPrimaryReplicaForTableIds =
ConcurrentHashMap.newKeySet();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ IndexTaskManager(
+ CatalogManager catalogManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler
+ ) {
+ this.catalogService = catalogManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.changeIndexStatusTaskScheduler = changeIndexStatusTaskScheduler;
+
+ addListeners();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ changeIndexStatusTaskScheduler.close();
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_CREATE,
adaptToEventListener(this::onIndexCreated));
+
+ catalogService.listen(CatalogEvent.INDEX_STOPPING,
adaptToEventListener(this::onIndexDropped));
+
+ catalogService.listen(CatalogEvent.INDEX_REMOVED,
adaptToEventListener(this::onIndexRemoved));
+
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
adaptToEventListener(this::onPrimaryReplicaElected));
+ }
+
+ /** Wraps a given callback into an EventListener. */
+ private <T extends EventParameters> EventListener<T>
adaptToEventListener(Consumer<T> action) {
+ return (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ inBusyLock(busyLock, () -> action.accept(parameters));
+
+ return falseCompletedFuture();
+ };
+ }
+
+ private void onIndexCreated(CreateIndexEventParameters parameters) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskScheduler.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
+import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Component is responsible for starting and stopping {@link
ChangeIndexStatusTask}. */
+class ChangeIndexStatusTaskScheduler implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(ChangeIndexStatusTaskScheduler.class);
+
+ private final CatalogManager catalogManager;
+
+ private final ClusterService clusterService;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final HybridClock clock;
+
+ private final ClockWaiter clockWaiter;
+
+ private final PlacementDriver placementDriver;
+
+ private final Executor executor;
+
+ private final Map<ChangeIndexStatusTaskId, ChangeIndexStatusTask> taskById
= new ConcurrentHashMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ ChangeIndexStatusTaskScheduler(
+ CatalogManager catalogManager,
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ HybridClock clock,
+ ClockWaiter clockWaiter,
+ PlacementDriver placementDriver,
+ Executor executor
+ ) {
+ this.catalogManager = catalogManager;
+ this.clusterService = clusterService;
+ this.logicalTopologyService = logicalTopologyService;
+ this.clock = clock;
+ this.clockWaiter = clockWaiter;
+ this.placementDriver = placementDriver;
+ this.executor = executor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ stopAllTasks();
+ }
+
+ /**
+ * Schedules a task for that will transfer the given index to the {@link
CatalogIndexStatus#BUILDING} state.
+ */
+ void scheduleStartBuildingTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.REGISTERED;
+
+ LOG.info("Scheduling starting of index building. Index: {}",
indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, startBuildingIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask
startBuildingIndexTask(CatalogIndexDescriptor indexDescriptor) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AbstractChangeIndexStatusEntry.java:
##########
@@ -44,23 +45,16 @@ abstract class AbstractChangeIndexStatusEntry implements
UpdateEntry {
@Override
public final Catalog applyUpdate(Catalog catalog, long causalityToken) {
+ CatalogIndexDescriptor newIndexDescriptor = updateIndexStatus(catalog,
causalityToken, newStatus);
Review Comment:
Let's allocate a new descriptor after receiving the schema so that the
necessary checks are performed and memory is not wasted.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/IndexTaskManager.java:
##########
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.index.IndexManagementUtils.isLocalNode;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
+
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.RemoveIndexEventParameters;
+import org.apache.ignite.internal.catalog.events.StoppingIndexEventParameters;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import org.apache.ignite.internal.event.EventListener;
+import org.apache.ignite.internal.event.EventParameters;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.ReplicaMeta;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Component that reacts to certain Catalog changes and starts or stops
corresponding {@link ChangeIndexStatusTask}s via the
+ * {@link ChangeIndexStatusTaskScheduler}.
+ *
+ * <p>Tasks will be started only if the local node is the primary replica for
the {@code 0} partition of the table which indexes have been
+ * modified in the Catalog.
+ *
+ * <p>The following events are being monitored:
+ * <ul>
+ * <li>{@link CatalogEvent#INDEX_CREATE} - when an index is created, a
task will be started to move the index to the
+ * {@link CatalogIndexStatus#BUILDING} state; </li>
+ * <li>{@link CatalogEvent#INDEX_STOPPING} - when an index is dropped, a
task will be started to remove the index from the Catalog;</li>
+ * <li>{@link CatalogEvent#INDEX_REMOVED} - when an index is removed from
the Catalog, all ongoing tasks for the index will be stopped;
+ * </li>
+ * <li>{@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED} - if the local
node has become the primary replica for partition {@code 0} of
+ * the table, it starts corresponding tasks depending on the current
status of indices in the Catalog. If the local node stops being the
+ * primary replica for the partition {@code 0}, it stops all tasks that
belong to indices of the table that the partition belongs to.
+ * </li>
+ * </ul>
+ *
+ * <p>On node recovery, tasks will be started on {@link
PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}, which will fire due to a change in
+ * the {@link ReplicaMeta#getLeaseholderId()} on node restart but after {@link
ReplicaMeta#getExpirationTime()}.
+ */
+class IndexTaskManager implements ManuallyCloseable {
+ private final CatalogService catalogService;
+
+ private final PlacementDriver placementDriver;
+
+ private final ClusterService clusterService;
+
+ private final ChangeIndexStatusTaskScheduler
changeIndexStatusTaskScheduler;
+
+ /** Tables IDs for which the local node is the primary replica for the
partition with ID {@code 0}. */
+ private final Set<Integer> localNodeIsPrimaryReplicaForTableIds =
ConcurrentHashMap.newKeySet();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ IndexTaskManager(
+ CatalogManager catalogManager,
+ PlacementDriver placementDriver,
+ ClusterService clusterService,
+ ChangeIndexStatusTaskScheduler changeIndexStatusTaskScheduler
+ ) {
+ this.catalogService = catalogManager;
+ this.placementDriver = placementDriver;
+ this.clusterService = clusterService;
+ this.changeIndexStatusTaskScheduler = changeIndexStatusTaskScheduler;
+
+ addListeners();
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ changeIndexStatusTaskScheduler.close();
+ }
+
+ private void addListeners() {
+ catalogService.listen(CatalogEvent.INDEX_CREATE,
adaptToEventListener(this::onIndexCreated));
+
+ catalogService.listen(CatalogEvent.INDEX_STOPPING,
adaptToEventListener(this::onIndexDropped));
+
+ catalogService.listen(CatalogEvent.INDEX_REMOVED,
adaptToEventListener(this::onIndexRemoved));
+
+ placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
adaptToEventListener(this::onPrimaryReplicaElected));
+ }
+
+ /** Wraps a given callback into an EventListener. */
+ private <T extends EventParameters> EventListener<T>
adaptToEventListener(Consumer<T> action) {
+ return (parameters, exception) -> {
+ if (exception != null) {
+ return failedFuture(exception);
+ }
+
+ inBusyLock(busyLock, () -> action.accept(parameters));
+
+ return falseCompletedFuture();
+ };
+ }
+
+ private void onIndexCreated(CreateIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor = parameters.indexDescriptor();
+
+ if
(localNodeIsPrimaryReplicaForTableIds.contains(indexDescriptor.tableId())) {
+ // Schedule building the index only if the local node is the
primary replica for the 0 partition of the table for which the
+ // index was created.
+
changeIndexStatusTaskScheduler.scheduleStartBuildingTask(parameters.indexDescriptor());
+ }
+ }
+
+ private void onIndexDropped(StoppingIndexEventParameters parameters) {
+ CatalogIndexDescriptor indexDescriptor =
catalogService.index(parameters.indexId(), parameters.catalogVersion());
+
+ assert indexDescriptor != null : parameters.indexId();
+
+ if
(localNodeIsPrimaryReplicaForTableIds.contains(indexDescriptor.tableId())) {
+ // Schedule index removal only if the local node is the primary
replica for the 0 partition of the table for which the
+ // index was dropped.
+
changeIndexStatusTaskScheduler.scheduleRemoveIndexTask(indexDescriptor);
+ }
+ }
+
+ private void onIndexRemoved(RemoveIndexEventParameters parameters) {
Review Comment:
I would add a postfix `Busy`.
##########
modules/index/src/main/java/org/apache/ignite/internal/index/ChangeIndexStatusTaskScheduler.java:
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.index;
+
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLockSafe;
+
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.ignite.internal.catalog.CatalogCommand;
+import org.apache.ignite.internal.catalog.CatalogManager;
+import org.apache.ignite.internal.catalog.ClockWaiter;
+import org.apache.ignite.internal.catalog.commands.RemoveIndexCommand;
+import org.apache.ignite.internal.catalog.commands.StartBuildingIndexCommand;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexStatus;
+import org.apache.ignite.internal.close.ManuallyCloseable;
+import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
+import org.apache.ignite.internal.hlc.HybridClock;
+import org.apache.ignite.internal.logger.IgniteLogger;
+import org.apache.ignite.internal.logger.Loggers;
+import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/** Component is responsible for starting and stopping {@link
ChangeIndexStatusTask}. */
+class ChangeIndexStatusTaskScheduler implements ManuallyCloseable {
+ private static final IgniteLogger LOG =
Loggers.forClass(ChangeIndexStatusTaskScheduler.class);
+
+ private final CatalogManager catalogManager;
+
+ private final ClusterService clusterService;
+
+ private final LogicalTopologyService logicalTopologyService;
+
+ private final HybridClock clock;
+
+ private final ClockWaiter clockWaiter;
+
+ private final PlacementDriver placementDriver;
+
+ private final Executor executor;
+
+ private final Map<ChangeIndexStatusTaskId, ChangeIndexStatusTask> taskById
= new ConcurrentHashMap<>();
+
+ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
+
+ private final AtomicBoolean closeGuard = new AtomicBoolean();
+
+ ChangeIndexStatusTaskScheduler(
+ CatalogManager catalogManager,
+ ClusterService clusterService,
+ LogicalTopologyService logicalTopologyService,
+ HybridClock clock,
+ ClockWaiter clockWaiter,
+ PlacementDriver placementDriver,
+ Executor executor
+ ) {
+ this.catalogManager = catalogManager;
+ this.clusterService = clusterService;
+ this.logicalTopologyService = logicalTopologyService;
+ this.clock = clock;
+ this.clockWaiter = clockWaiter;
+ this.placementDriver = placementDriver;
+ this.executor = executor;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (!closeGuard.compareAndSet(false, true)) {
+ return;
+ }
+
+ busyLock.block();
+
+ stopAllTasks();
+ }
+
+ /**
+ * Schedules a task for that will transfer the given index to the {@link
CatalogIndexStatus#BUILDING} state.
+ */
+ void scheduleStartBuildingTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.REGISTERED;
+
+ LOG.info("Scheduling starting of index building. Index: {}",
indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, startBuildingIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask
startBuildingIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ return new ChangeIndexStatusTask(
+ indexDescriptor,
+ catalogManager,
+ placementDriver,
+ clusterService,
+ logicalTopologyService,
+ clock,
+ clockWaiter,
+ executor,
+ busyLock
+ ) {
+ @Override
+ CatalogCommand switchIndexStatusCommand() {
+ return
StartBuildingIndexCommand.builder().indexId(indexDescriptor.id()).build();
+ }
+ };
+ }
+
+ /**
+ * Schedules a task for that will remove a given index from the Catalog.
+ */
+ void scheduleRemoveIndexTask(CatalogIndexDescriptor indexDescriptor) {
+ assert indexDescriptor.status() == CatalogIndexStatus.STOPPING;
+
+ LOG.info("Scheduling index removal. Index: {}", indexDescriptor);
+
+ inBusyLockSafe(busyLock, () -> {
+ var taskId = new ChangeIndexStatusTaskId(indexDescriptor);
+
+ scheduleTask(taskId, removeIndexTask(indexDescriptor));
+ });
+ }
+
+ private ChangeIndexStatusTask removeIndexTask(CatalogIndexDescriptor
indexDescriptor) {
+ return new ChangeIndexStatusTask(
+ indexDescriptor,
+ catalogManager,
+ placementDriver,
+ clusterService,
+ logicalTopologyService,
+ clock,
+ clockWaiter,
+ executor,
+ busyLock
+ ) {
+ @Override
+ CatalogCommand switchIndexStatusCommand() {
+ return
RemoveIndexCommand.builder().indexId(indexDescriptor.id()).build();
+ }
+ };
+ }
+
+ private void scheduleTask(ChangeIndexStatusTaskId taskId,
ChangeIndexStatusTask task) {
Review Comment:
I would add a postfix `Busy`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]