Mmuzaf commented on a change in pull request #9047:
URL: https://github.com/apache/ignite/pull/9047#discussion_r623073769



##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to 
the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();
+
+        try {
+            return worker.flush();
+        }
+        finally {
+            lock.readLock().unlock();
+        }
+    }
+
+    /**
+     * @param compFut Future which should be completed when worker may proceed 
with updates.
+     */
+    public void pauseMetaStorage(IgniteInternalFuture<?> compFut) {

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DistributedMetaStorageImpl.java
##########
@@ -1210,6 +1210,39 @@ private void onAckMessage(
         }
     }
 
+    /**
+     * @return Future which will be completed when all the updates prior to 
the pause processed.
+     */
+    public Future<?> flush() {
+        assert isPersistenceEnabled;
+
+        lock.readLock().lock();

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -69,7 +69,16 @@ public DmsWorkerStatus status() {
     private volatile ReadWriteMetastorage metastorage;
 
     /** */
-    private volatile boolean firstStart = true;
+    private volatile RunnableFuture<?> curTask;

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();
+
+        // Put restore task to the queue, so it will be executed on worker 
start.
+        updateQueue.offer(newDmsTask(this::restore));
     }
 
     /** */
     public void setMetaStorage(ReadWriteMetastorage metastorage) {
         this.metastorage = metastorage;
     }
 
-    /** */
-    public void update(DistributedMetaStorageHistoryItem histItem) {
-        updateQueue.offer(histItem);
+    /**
+     * @return Future which will be completed will all the tasks prior to the 
pause task completed.
+     */
+    public Future<?> flush() {
+        return pauseTask;
     }
 
-    /** */
-    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
-        assert fullNodeData.fullData != null;
-        assert fullNodeData.hist != null;
-
-        updateQueue.clear();
+    /**
+     * @param compFut Future which should be completed when worker may proceed 
with updates.
+     */
+    public void pause(IgniteInternalFuture<?> compFut) {
+        latch = new CountDownLatch(1);
 
-        updateQueue.offer(fullNodeData);
-    }
+        updateQueue.offer(pauseTask = new FutureTask<>(() -> AWAIT));
 
-    /** */
-    public void removeHistItem(long ver) {
-        updateQueue.offer(ver);
+        compFut.listen(f -> latch.countDown());
     }
 
     /** */
-    public void cancel(boolean halt) throws InterruptedException {
-        if (halt)
-            updateQueue.clear();
+    public void update(DistributedMetaStorageHistoryItem histItem) {
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.write(historyItemKey(workerDmsVer.id() + 1), histItem);
 
-        updateQueue.offer(status = halt ? HALT : CANCEL);
+            workerDmsVer = workerDmsVer.nextVersion(histItem);
 
-        Thread runner = runner();
+            metastorage.write(versionKey(), workerDmsVer);
 
-        if (runner != null)
-            runner.join();
+            for (int i = 0, len = histItem.keys().length; i < len; i++)
+                write(histItem.keys()[i], histItem.valuesBytesArray()[i]);
+        }));
     }
 
-    /** {@inheritDoc} */
-    @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
-        status = CONTINUE;
+    /** */
+    public void update(DistributedMetaStorageClusterNodeData fullNodeData) {
+        assert fullNodeData.fullData != null;
+        assert fullNodeData.hist != null;
 
-        try {
-            if (firstStart) {
-                firstStart = false;
+        updateQueue.clear();
 
-                lock.lock();
+        updateQueue.offer(newDmsTask(() -> {
+            metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
 
-                try {
-                    restore();
-                }
-                finally {
-                    lock.unlock();
-                }
-            }
+            doCleanup();
 
-            while (true) {
-                Object update = updateQueue.peek();
+            for (DistributedMetaStorageKeyValuePair item : 
fullNodeData.fullData)
+                metastorage.writeRaw(localKey(item.key), item.valBytes);
 
-                try {
-                    update = updateQueue.take();
-                }
-                catch (InterruptedException ignore) {
-                }
+            for (int i = 0, len = fullNodeData.hist.length; i < len; i++) {
+                DistributedMetaStorageHistoryItem histItem = 
fullNodeData.hist[i];
 
-                lock.lock();
+                long histItemVer = fullNodeData.ver.id() + i - (len - 1);
 
-                try {
-                    // process update
-                    if (update instanceof DistributedMetaStorageHistoryItem)
-                        applyUpdate((DistributedMetaStorageHistoryItem)update);
-                    else if (update instanceof 
DistributedMetaStorageClusterNodeData) {
-                        DistributedMetaStorageClusterNodeData fullNodeData = 
(DistributedMetaStorageClusterNodeData)update;
+                metastorage.write(historyItemKey(histItemVer), histItem);
+            }
 
-                        metastorage.writeRaw(cleanupGuardKey(), DUMMY_VALUE);
+            metastorage.write(versionKey(), fullNodeData.ver);
 
-                        doCleanup();
+            workerDmsVer = fullNodeData.ver;
 
-                        for (DistributedMetaStorageKeyValuePair item : 
fullNodeData.fullData)
-                            metastorage.writeRaw(localKey(item.key), 
item.valBytes);
+            metastorage.remove(cleanupGuardKey());
+        }));
+    }
 
-                        for (int i = 0, len = fullNodeData.hist.length; i < 
len; i++) {
-                            DistributedMetaStorageHistoryItem histItem = 
fullNodeData.hist[i];
+    /** */
+    public void removeHistItem(long ver) {
+        updateQueue.offer(newDmsTask(() -> 
metastorage.remove(historyItemKey(ver))));
+    }
 
-                            long histItemVer = fullNodeData.ver.id() + i - 
(len - 1);
+    /** */
+    public void cancel(boolean halt) {
+        if (halt)
+            updateQueue.clear();
 
-                            metastorage.write(historyItemKey(histItemVer), 
histItem);
-                        }
+        updateQueue.offer(new FutureTask<>(() -> STOP));
 
-                        metastorage.write(versionKey(), fullNodeData.ver);
+        U.join(runner(), log);
+    }
 
-                        workerDmsVer = fullNodeData.ver;
+    /** {@inheritDoc} */
+    @Override protected void body() throws InterruptedException, 
IgniteInterruptedCheckedException {
+        while (true) {
+            try {
+                curTask = updateQueue.take();
+            }
+            catch (InterruptedException ignore) {
+            }
 
-                        metastorage.remove(cleanupGuardKey());
-                    }
-                    else if (update instanceof Long) {
-                        long ver = (Long)update;
+            curTask.run();

Review comment:
       Fixed.

##########
File path: 
modules/core/src/main/java/org/apache/ignite/internal/processors/metastorage/persistence/DmsDataWriterWorker.java
##########
@@ -81,134 +90,125 @@ public DmsDataWriterWorker(
         super(igniteInstanceName, "dms-writer", log);
         this.lock = lock;
         this.errorHnd = errorHnd;
+
+        pauseTask = new FutureTask<>(() -> AWAIT);
+        // Completed future task.
+        pauseTask.run();

Review comment:
       Fixed.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to