tkalkirill commented on a change in pull request #9037:
URL: https://github.com/apache/ignite/pull/9037#discussion_r622868012
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
*/
package org.apache.ignite.internal.processors.localtask;
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import
org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
/**
- * Processor that is responsible for durable background tasks that are
executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are
executed on local node.
*/
public class DurableBackgroundTasksProcessor extends GridProcessorAdapter
implements MetastorageLifecycleListener,
CheckpointListener {
/** Prefix for metastorage keys for durable background tasks. */
- private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX =
"durable-background-task-";
+ private static final String TASK_PREFIX = "durable-background-task-";
- /** Metastorage. */
- private volatile ReadWriteMetastorage metastorage;
-
- /** Metastorage synchronization mutex. */
+ /** MetaStorage synchronization mutex. */
private final Object metaStorageMux = new Object();
- /** Set of workers that executing durable background tasks. */
- private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new
GridConcurrentHashSet<>();
-
- /** Count of workers that executing durable background tasks. */
- private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new
AtomicInteger(0);
+ /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name}
-> task state. */
+ private final ConcurrentMap<String, DurableBackgroundTaskState> tasks =
new ConcurrentHashMap<>();
- /** Durable background tasks map. */
- private final ConcurrentHashMap<String, DurableBackgroundTask>
durableBackgroundTasks = new ConcurrentHashMap<>();
-
- /** Set of started tasks' names. */
- private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+ /** Lock for canceling tasks. */
+ private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
/**
- * Ban to start new tasks. The first time the cluster is activated, it
will try again to run existing tasks.
- *
- * @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+ * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+ * Mapping: {@link DurableBackgroundTask#name task name} -> task.
*/
- private volatile boolean forbidStartingNewTasks;
+ private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new
ConcurrentHashMap<>();
+
+ /** Prohibiting the execution of tasks. */
+ private volatile boolean prohibitionExecTasks = true;
/**
+ * Constructor.
+ *
* @param ctx Kernal context.
*/
public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
super(ctx);
}
- /**
- * Starts the asynchronous operation of pending tasks execution. Is called
on start.
- */
- private void asyncDurableBackgroundTasksExecution() {
- assert durableBackgroundTasks != null;
-
- for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
- if (!task.isCompleted() && startedTasks.add(task.shortName()))
- asyncDurableBackgroundTaskExecute(task);
- }
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
- /**
- * Creates a worker to execute single durable background task.
- *
- * @param task Task.
- */
- private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task)
{
- String workerName = "async-durable-background-task-executor-" +
asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ cancelTasks();
+ }
- GridWorker worker = new GridWorker(ctx.igniteInstanceName(),
workerName, log) {
- @Override public void cancel() {
- task.onCancel();
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.iterate(
+ TASK_PREFIX,
+ (k, v) -> {
+ DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+ tasks.put(t.name(), new DurableBackgroundTaskState(t,
null, true));
+ },
+ true
+ );
+ });
+ }
- super.cancel();
- }
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage
metastorage) {
+
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ }
- @Override protected void body() {
- try {
- if (forbidStartingNewTasks)
- return;
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ /* No op. */
+ }
- log.info("Executing durable background task: " +
task.shortName());
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ for (Iterator<Entry<String, DurableBackgroundTaskState>> it =
tasks.entrySet().iterator(); it.hasNext(); ) {
+ DurableBackgroundTaskState taskState = it.next().getValue();
- task.execute(ctx);
+ if (taskState.state() == COMPLETED) {
+ assert taskState.saved();
- task.complete();
+ DurableBackgroundTask t = taskState.task();
- log.info("Execution of durable background task completed:
" + task.shortName());
- }
- catch (Throwable e) {
- log.error("Could not execute durable background task: " +
task.shortName(), e);
- }
- finally {
- startedTasks.remove(task.shortName());
+ toRmv.put(t.name(), t);
- asyncDurableBackgroundTaskWorkers.remove(this);
- }
+ it.remove();
}
- };
-
- asyncDurableBackgroundTaskWorkers.add(worker);
-
- Thread asyncTask = new IgniteThread(worker);
-
- asyncTask.start();
+ }
}
/** {@inheritDoc} */
- @Override public void onKernalStart(boolean active) {
- asyncDurableBackgroundTasksExecution();
+ @Override public void onCheckpointBegin(Context ctx) {
+ /* No op. */
}
/** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- forbidStartingNewTasks = true;
+ @Override public void afterCheckpointEnd(Context ctx) {
+ for (Iterator<Entry<String, DurableBackgroundTask>> it =
toRmv.entrySet().iterator(); it.hasNext(); ) {
+ DurableBackgroundTask t = it.next().getValue();
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
- }
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null && toRmv.containsKey(t.name()))
+ metaStorage.remove(metaStorageKey(t));
+ });
- /** {@inheritDoc} */
- @Override public void start() throws IgniteCheckedException {
- ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
+ it.remove();
+ }
}
/**
- * @param msg Message.
+ * Callback at the start of a global state change.
+ *
+ * @param msg Message for change cluster global state.
*/
- public void onStateChange(ChangeGlobalStateMessage msg) {
- if (msg.state() == ClusterState.INACTIVE) {
- forbidStartingNewTasks = true;
-
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
- }
+ public void onStateChangeStarted(ChangeGlobalStateMessage msg) {
+ if (msg.state() == ClusterState.INACTIVE)
+ cancelTasks();
}
/**
- * @param msg Message.
+ * Callback on finish of a global state change.
+ *
+ * @param msg Finish message for change cluster global state.
*/
public void onStateChangeFinish(ChangeGlobalStateFinishMessage msg) {
if (msg.state() != ClusterState.INACTIVE) {
- forbidStartingNewTasks = false;
-
- asyncDurableBackgroundTasksExecution();
- }
- }
+ prohibitionExecTasks = false;
- /** {@inheritDoc} */
- @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
- synchronized (metaStorageMux) {
- if (durableBackgroundTasks.isEmpty()) {
- try {
- metastorage.iterate(
- STORE_DURABLE_BACKGROUND_TASK_PREFIX,
- (key, val) -> durableBackgroundTasks.put(key,
(DurableBackgroundTask)val),
- true
- );
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to iterate durable
background tasks storage.", e);
- }
+ for (DurableBackgroundTaskState taskState : tasks.values()) {
+ if (!prohibitionExecTasks)
+ executeAsync0(taskState.task());
}
}
}
- /** {@inheritDoc} */
- @Override public void onReadyForReadWrite(ReadWriteMetastorage
metastorage) {
- synchronized (metaStorageMux) {
- try {
- for (Map.Entry<String, DurableBackgroundTask> entry :
durableBackgroundTasks.entrySet()) {
- if (metastorage.readRaw(entry.getKey()) == null)
- metastorage.write(entry.getKey(), entry.getValue());
- }
- }
- catch (IgniteCheckedException e) {
- throw new IgniteException("Failed to read key from durable
background tasks storage.", e);
- }
+ /**
+ * Asynchronous execution of a durable background task.
+ *
+ * A new task will be added for execution either if there is no task with
+ * the same {@link DurableBackgroundTask#name name} or it (previous) will
be completed.
+ *
+ * If the task is required to be completed after restarting the node,
+ * then it must be saved to the MetaStorage.
+ *
+ * If the task is saved to the Metastorage, then it will be deleted from it
+ * only after its completion and at the end of the checkpoint. Otherwise,
it
+ * will be removed as soon as it is completed.
+ *
+ * @param task Durable background task.
+ * @param save Save task to MetaStorage.
+ * @return Futures that will complete when the task is completed.
+ */
+ public IgniteInternalFuture<Void> executeAsync(DurableBackgroundTask task,
boolean save) {
+ DurableBackgroundTaskState taskState = tasks.compute(task.name(),
(taskName, prev) -> {
+ if (prev != null && prev.state() != COMPLETED)
+ throw new IllegalArgumentException("Task is already present
and has not been completed: " + taskName);
+
+ if (save)
+ toRmv.remove(taskName);
Review comment:
We discussed it in a personal.
##########
File path:
modules/core/src/main/java/org/apache/ignite/internal/processors/localtask/DurableBackgroundTasksProcessor.java
##########
@@ -16,315 +16,342 @@
*/
package org.apache.ignite.internal.processors.localtask;
-import java.util.Map;
-import java.util.Set;
+import java.util.Iterator;
+import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.IgniteCheckedException;
import org.apache.ignite.IgniteException;
import org.apache.ignite.cluster.ClusterState;
import org.apache.ignite.configuration.CacheConfiguration;
import org.apache.ignite.internal.GridKernalContext;
-import org.apache.ignite.internal.client.util.GridConcurrentHashSet;
+import org.apache.ignite.internal.IgniteInternalFuture;
import org.apache.ignite.internal.processors.GridProcessorAdapter;
import
org.apache.ignite.internal.processors.cache.persistence.GridCacheDatabaseSharedManager;
+import
org.apache.ignite.internal.processors.cache.persistence.IgniteCacheDatabaseSharedManager;
import
org.apache.ignite.internal.processors.cache.persistence.checkpoint.CheckpointListener;
+import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetaStorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageLifecycleListener;
-import
org.apache.ignite.internal.processors.cache.persistence.metastorage.MetastorageTree;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadOnlyMetastorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.ReadWriteMetastorage;
import
org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTask;
+import
org.apache.ignite.internal.processors.cache.persistence.metastorage.pendingtask.DurableBackgroundTaskResult;
import
org.apache.ignite.internal.processors.cluster.ChangeGlobalStateFinishMessage;
import org.apache.ignite.internal.processors.cluster.ChangeGlobalStateMessage;
+import org.apache.ignite.internal.util.future.GridFutureAdapter;
+import org.apache.ignite.internal.util.lang.IgniteThrowableConsumer;
import org.apache.ignite.internal.util.typedef.internal.CU;
-import org.apache.ignite.internal.util.worker.GridWorker;
-import org.apache.ignite.thread.IgniteThread;
-import static org.apache.ignite.internal.util.IgniteUtils.awaitForWorkersStop;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.COMPLETED;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.INIT;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.PREPARE;
+import static
org.apache.ignite.internal.processors.localtask.DurableBackgroundTaskState.State.STARTED;
/**
- * Processor that is responsible for durable background tasks that are
executed on local node
- * and should be continued even after node restart.
+ * Processor that is responsible for durable background tasks that are
executed on local node.
*/
public class DurableBackgroundTasksProcessor extends GridProcessorAdapter
implements MetastorageLifecycleListener,
CheckpointListener {
/** Prefix for metastorage keys for durable background tasks. */
- private static final String STORE_DURABLE_BACKGROUND_TASK_PREFIX =
"durable-background-task-";
+ private static final String TASK_PREFIX = "durable-background-task-";
- /** Metastorage. */
- private volatile ReadWriteMetastorage metastorage;
-
- /** Metastorage synchronization mutex. */
+ /** MetaStorage synchronization mutex. */
private final Object metaStorageMux = new Object();
- /** Set of workers that executing durable background tasks. */
- private final Set<GridWorker> asyncDurableBackgroundTaskWorkers = new
GridConcurrentHashSet<>();
-
- /** Count of workers that executing durable background tasks. */
- private final AtomicInteger asyncDurableBackgroundTasksWorkersCntr = new
AtomicInteger(0);
+ /** Current tasks. Mapping: {@link DurableBackgroundTask#name task name}
-> task state. */
+ private final ConcurrentMap<String, DurableBackgroundTaskState> tasks =
new ConcurrentHashMap<>();
- /** Durable background tasks map. */
- private final ConcurrentHashMap<String, DurableBackgroundTask>
durableBackgroundTasks = new ConcurrentHashMap<>();
-
- /** Set of started tasks' names. */
- private final Set<String> startedTasks = new GridConcurrentHashSet<>();
+ /** Lock for canceling tasks. */
+ private final ReadWriteLock cancelLock = new ReentrantReadWriteLock(true);
/**
- * Ban to start new tasks. The first time the cluster is activated, it
will try again to run existing tasks.
- *
- * @see #onStateChangeFinish(ChangeGlobalStateFinishMessage)
+ * Tasks to be removed from the MetaStorage after the end of a checkpoint.
+ * Mapping: {@link DurableBackgroundTask#name task name} -> task.
*/
- private volatile boolean forbidStartingNewTasks;
+ private final ConcurrentMap<String, DurableBackgroundTask> toRmv = new
ConcurrentHashMap<>();
+
+ /** Prohibiting the execution of tasks. */
+ private volatile boolean prohibitionExecTasks = true;
/**
+ * Constructor.
+ *
* @param ctx Kernal context.
*/
public DurableBackgroundTasksProcessor(GridKernalContext ctx) {
super(ctx);
}
- /**
- * Starts the asynchronous operation of pending tasks execution. Is called
on start.
- */
- private void asyncDurableBackgroundTasksExecution() {
- assert durableBackgroundTasks != null;
-
- for (DurableBackgroundTask task : durableBackgroundTasks.values()) {
- if (!task.isCompleted() && startedTasks.add(task.shortName()))
- asyncDurableBackgroundTaskExecute(task);
- }
+ /** {@inheritDoc} */
+ @Override public void start() throws IgniteCheckedException {
+ ctx.internalSubscriptionProcessor().registerMetastorageListener(this);
}
- /**
- * Creates a worker to execute single durable background task.
- *
- * @param task Task.
- */
- private void asyncDurableBackgroundTaskExecute(DurableBackgroundTask task)
{
- String workerName = "async-durable-background-task-executor-" +
asyncDurableBackgroundTasksWorkersCntr.getAndIncrement();
+ /** {@inheritDoc} */
+ @Override public void onKernalStop(boolean cancel) {
+ cancelTasks();
+ }
- GridWorker worker = new GridWorker(ctx.igniteInstanceName(),
workerName, log) {
- @Override public void cancel() {
- task.onCancel();
+ /** {@inheritDoc} */
+ @Override public void onReadyForRead(ReadOnlyMetastorage metastorage) {
+ metaStorageOperation(metaStorage -> {
+ assert metaStorage != null;
+
+ metaStorage.iterate(
+ TASK_PREFIX,
+ (k, v) -> {
+ DurableBackgroundTask t = (DurableBackgroundTask)v;
+
+ tasks.put(t.name(), new DurableBackgroundTaskState(t,
null, true));
+ },
+ true
+ );
+ });
+ }
- super.cancel();
- }
+ /** {@inheritDoc} */
+ @Override public void onReadyForReadWrite(ReadWriteMetastorage
metastorage) {
+
((GridCacheDatabaseSharedManager)ctx.cache().context().database()).addCheckpointListener(this);
+ }
- @Override protected void body() {
- try {
- if (forbidStartingNewTasks)
- return;
+ /** {@inheritDoc} */
+ @Override public void beforeCheckpointBegin(Context ctx) {
+ /* No op. */
+ }
- log.info("Executing durable background task: " +
task.shortName());
+ /** {@inheritDoc} */
+ @Override public void onMarkCheckpointBegin(Context ctx) {
+ for (Iterator<Entry<String, DurableBackgroundTaskState>> it =
tasks.entrySet().iterator(); it.hasNext(); ) {
+ DurableBackgroundTaskState taskState = it.next().getValue();
- task.execute(ctx);
+ if (taskState.state() == COMPLETED) {
+ assert taskState.saved();
- task.complete();
+ DurableBackgroundTask t = taskState.task();
- log.info("Execution of durable background task completed:
" + task.shortName());
- }
- catch (Throwable e) {
- log.error("Could not execute durable background task: " +
task.shortName(), e);
- }
- finally {
- startedTasks.remove(task.shortName());
+ toRmv.put(t.name(), t);
- asyncDurableBackgroundTaskWorkers.remove(this);
- }
+ it.remove();
}
- };
-
- asyncDurableBackgroundTaskWorkers.add(worker);
-
- Thread asyncTask = new IgniteThread(worker);
-
- asyncTask.start();
+ }
}
/** {@inheritDoc} */
- @Override public void onKernalStart(boolean active) {
- asyncDurableBackgroundTasksExecution();
+ @Override public void onCheckpointBegin(Context ctx) {
+ /* No op. */
}
/** {@inheritDoc} */
- @Override public void onKernalStop(boolean cancel) {
- forbidStartingNewTasks = true;
+ @Override public void afterCheckpointEnd(Context ctx) {
+ for (Iterator<Entry<String, DurableBackgroundTask>> it =
toRmv.entrySet().iterator(); it.hasNext(); ) {
+ DurableBackgroundTask t = it.next().getValue();
- awaitForWorkersStop(asyncDurableBackgroundTaskWorkers, true, log);
- }
+ metaStorageOperation(metaStorage -> {
+ if (metaStorage != null && toRmv.containsKey(t.name()))
Review comment:
We discussed it in a personal.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]