OneSizeFitsQuorum commented on code in PR #12202:
URL: https://github.com/apache/iotdb/pull/12202#discussion_r1531842575


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/manager/consensus/ConsensusManager.java:
##########
@@ -396,7 +398,7 @@ public TSStatus confirmLeader() {
             Thread.sleep(RETRY_WAIT_TIME_MS);
           } catch (InterruptedException e) {
             Thread.currentThread().interrupt();
-            LOGGER.warn("Unexpected interruption during waiting for configNode 
leader ready.");
+            LOGGER.warn("Unexpected interruption during waiting for configNode 
leader ready.", e);

Review Comment:
   remove e



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private final AtomicLong lastProcId = new AtomicLong(-1);
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
-  private final String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
 
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+  private final Map<Long, ProcedureWAL> procWALMap = new HashMap<>();
+
+  private final ConfigManager configManager;
+
+  public ProcedureInfo(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public boolean isOldVersion() {
+    return new File(OLD_PROCEDURE_WAL_DIR).exists();
+  }
+
+  public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+    List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+    try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
       s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
                       
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
                       
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
+          .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
     } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
+      LOGGER.error("Load procedure wal failed.", e);
+    }
+    procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
+    procedureList.forEach(
+        procedure -> lastProcId.set(Math.max(lastProcId.get(), 
procedure.getProcId())));
+    try {
+      configManager.getConsensusManager().manuallyTakeSnapshot();
+    } catch (ConsensusException e) {
+      // TODO: how to handle exception
+      throw new RuntimeException(e);
     }
+    try {
+      FileUtils.recursiveDeleteFolder(OLD_PROCEDURE_WAL_DIR);
+    } catch (IOException e) {
+      LOGGER.error("Delete useless procedure wal dir fail.", e);
+      LOGGER.error(
+          "You should manually delete the procedure wal dir before ConfigNode 
restart. {}",
+          OLD_PROCEDURE_WAL_DIR);
+    }
+    LOGGER.info(
+        "The Procedure framework has been successfully upgraded. Now it uses 
the consensus layer's services instead of maintaining the WAL itself.");
+    return procedureList;
   }
 
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
+    Procedure<ConfigNodeProcedureEnv> procedure = 
updateProcedurePlan.getProcedure();
+    procedureMap.put(procedure.getProcId(), procedure);
+    long current;
+    do {
+      current = lastProcId.get();
+      if (current >= procedure.getProcId()) {
+        break;
+      }
+    } while (!lastProcId.compareAndSet(current, procedure.getProcId()));
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  @TestOnly
+  public TSStatus oldUpdateProcedure(UpdateProcedurePlan updateProcedurePlan) {
     Procedure procedure = updateProcedurePlan.getProcedure();
     long procId = procedure.getProcId();
-    Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
+    Path path = Paths.get(OLD_PROCEDURE_WAL_DIR, procId + 
PROCEDURE_WAL_SUFFIX);
     ProcedureWAL procedureWAL =
         procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, 
procedureFactory));
     try {
       procedureWAL.save(procedure);
     } catch (IOException e) {
-      LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId(), 
e);
+      LOGGER.error("Update Procedure (pid={}) wal failed", 
procedure.getProcId(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   public TSStatus deleteProcedure(DeleteProcedurePlan deleteProcedurePlan) {
-    long procId = deleteProcedurePlan.getProcId();
-    ProcedureWAL procedureWAL = procWALMap.get(procId);
-    if (procedureWAL != null) {
-      procedureWAL.delete();
-    }
-    procWALMap.remove(procId);
+    procedureMap.remove(deleteProcedurePlan.getProcId());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
+
+  private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
+    try (FileInputStream fis = new 
FileInputStream(procedureFilePath.toFile())) {
+      Procedure procedure = null;
+      try (FileChannel channel = fis.getChannel()) {
+        ByteBuffer byteBuffer = 
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
+        if (channel.read(byteBuffer) > 0) {
+          byteBuffer.flip();
+          procedure = ProcedureFactory.getInstance().create(byteBuffer);
+          byteBuffer.clear();
+        }
+        return Optional.ofNullable(procedure);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Load {} failed, it will be deleted.", procedureFilePath, 
e);
+      if (!procedureFilePath.toFile().delete()) {
+        LOGGER.error("{} delete failed; take appropriate action.", 
procedureFilePath, e);
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
+    File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
+    if (procedureSnapshotDir.exists()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot dir [{}] is already 
exist.",
+          procedureSnapshotDir.getAbsolutePath());
+      return false;
+    }
+    File tmpDir = new File(procedureSnapshotDir.getAbsolutePath() + "-" + 
UUID.randomUUID());
+    if (!tmpDir.mkdir()) {
+      LOGGER.error("Failed to take snapshot, because create tmp dir [{}] 
fail.", tmpDir);
+      return false;
+    }
+
+    // save lastProcId
+    File mainFile = new File(tmpDir.getAbsolutePath() + File.separator + 
MAIN_SNAPSHOT_FILENAME);
+    try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
+        DataOutputStream dataOutputStream = new 
DataOutputStream(fileOutputStream);
+        TIOStreamTransport tioStreamTransport = new 
TIOStreamTransport(fileOutputStream)) {
+      ReadWriteIOUtils.write(lastProcId.get(), fileOutputStream);
+      tioStreamTransport.flush();
+      fileOutputStream.getFD().sync();
+    }
+
+    // save all procedures
+    procedureMap
+        .values()
+        .forEach(
+            procedure -> {
+              try {
+                new ProcedureWAL(
+                        Paths.get(
+                            tmpDir.getAbsolutePath()
+                                + File.separator
+                                + procedure.getProcId()
+                                + PROCEDURE_SNAPSHOT_FILE_SUFFIX),
+                        procedureFactory)
+                    .save(procedure);
+              } catch (IOException e) {
+                throw new RuntimeException(e);

Review Comment:
   notice this



##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/TestProcedureBase.java:
##########
@@ -38,16 +38,13 @@ public class TestProcedureBase {
   @Before
   public void setUp() {
     initExecutor();
-    this.procStore.start();

Review Comment:
   .



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/CompletedProcedureRecycler.java:
##########
@@ -62,7 +62,7 @@ protected void periodicExecute(final Env env) {
     final long now = System.currentTimeMillis();
     final Iterator<Map.Entry<Long, CompletedProcedureContainer<Env>>> it =
         completed.entrySet().iterator();
-    while (it.hasNext() && store.isRunning()) {
+    while (it.hasNext()) {

Review Comment:
   do not modify



##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/procedure/util/ProcedureTestUtil.java:
##########
@@ -54,13 +52,4 @@ public static void sleepWithoutInterrupt(final long 
timeToSleep) {
       Thread.currentThread().interrupt();
     }
   }
-
-  public static void stopService(

Review Comment:
   look hbase



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private final AtomicLong lastProcId = new AtomicLong(-1);
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
-  private final String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
 
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+  private final Map<Long, ProcedureWAL> procWALMap = new HashMap<>();
+
+  private final ConfigManager configManager;
+
+  public ProcedureInfo(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public boolean isOldVersion() {
+    return new File(OLD_PROCEDURE_WAL_DIR).exists();
+  }
+
+  public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+    List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+    try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
       s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
                       
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
                       
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
+          .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
     } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
+      LOGGER.error("Load procedure wal failed.", e);
+    }
+    procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
+    procedureList.forEach(
+        procedure -> lastProcId.set(Math.max(lastProcId.get(), 
procedure.getProcId())));
+    try {
+      configManager.getConsensusManager().manuallyTakeSnapshot();
+    } catch (ConsensusException e) {
+      // TODO: how to handle exception
+      throw new RuntimeException(e);

Review Comment:
   log warn and return, do not delete



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private final AtomicLong lastProcId = new AtomicLong(-1);

Review Comment:
   long



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java:
##########
@@ -19,31 +19,30 @@
 
 package org.apache.iotdb.confignode.procedure.store;
 
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 
 import java.util.List;
 
-public interface IProcedureStore {
+public interface IProcedureStore<Env> {
 
-  boolean isRunning();

Review Comment:
   do not remove?



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private final AtomicLong lastProcId = new AtomicLong(-1);
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
-  private final String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
 
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+  private final Map<Long, ProcedureWAL> procWALMap = new HashMap<>();
+
+  private final ConfigManager configManager;
+
+  public ProcedureInfo(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public boolean isOldVersion() {
+    return new File(OLD_PROCEDURE_WAL_DIR).exists();
+  }
+
+  public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+    List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+    try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
       s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
                       
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
                       
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
+          .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
     } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
+      LOGGER.error("Load procedure wal failed.", e);
+    }
+    procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
+    procedureList.forEach(
+        procedure -> lastProcId.set(Math.max(lastProcId.get(), 
procedure.getProcId())));
+    try {
+      configManager.getConsensusManager().manuallyTakeSnapshot();
+    } catch (ConsensusException e) {
+      // TODO: how to handle exception
+      throw new RuntimeException(e);
     }
+    try {
+      FileUtils.recursiveDeleteFolder(OLD_PROCEDURE_WAL_DIR);
+    } catch (IOException e) {
+      LOGGER.error("Delete useless procedure wal dir fail.", e);
+      LOGGER.error(
+          "You should manually delete the procedure wal dir before ConfigNode 
restart. {}",
+          OLD_PROCEDURE_WAL_DIR);
+    }
+    LOGGER.info(
+        "The Procedure framework has been successfully upgraded. Now it uses 
the consensus layer's services instead of maintaining the WAL itself.");
+    return procedureList;
   }
 
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
+    Procedure<ConfigNodeProcedureEnv> procedure = 
updateProcedurePlan.getProcedure();
+    procedureMap.put(procedure.getProcId(), procedure);
+    long current;
+    do {
+      current = lastProcId.get();
+      if (current >= procedure.getProcId()) {
+        break;
+      }
+    } while (!lastProcId.compareAndSet(current, procedure.getProcId()));

Review Comment:
   check whether we can remove `do while`



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =

Review Comment:
   HashMap



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/ConfigProcedureStore.java:
##########
@@ -119,25 +120,14 @@ public void delete(long[] batchIds, int startIndex, int 
batchCount) {
     }
   }
 
-  /** clean all the wal, used for unit test. */
-  public void cleanup() {
-    // no op
-  }
-
-  public void stop() {

Review Comment:
   do not remove.consider follower status



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/store/IProcedureStore.java:
##########
@@ -19,31 +19,30 @@
 
 package org.apache.iotdb.confignode.procedure.store;
 
+import org.apache.iotdb.confignode.persistence.ProcedureInfo;
 import org.apache.iotdb.confignode.procedure.Procedure;
 
 import java.util.List;
 
-public interface IProcedureStore {
+public interface IProcedureStore<Env> {
 
-  boolean isRunning();
+  List<Procedure<Env>> load();
 
-  void setRunning(boolean running);
+  List<Procedure<Env>> getProcedures();
 
-  void load(List<Procedure> procedureList);
+  ProcedureInfo getProcedureInfo();
 
-  void update(Procedure procedure);
+  long getNextProcId();
 
-  void update(Procedure[] subprocs);
+  void update(Procedure<Env> procedure);
+
+  void update(Procedure<Env>[] subprocs);
 
   void delete(long procId);
 
   void delete(long[] childProcIds);
 
   void delete(long[] batchIds, int startIndex, int batchCount);
 
-  void cleanup();
-
-  void stop();

Review Comment:
   do not remove?
   



##########
iotdb-core/confignode/src/test/java/org/apache/iotdb/confignode/persistence/ProcedureInfoTest.java:
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.confignode.persistence;
+
+import org.apache.iotdb.commons.utils.FileUtils;
+import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import 
org.apache.iotdb.confignode.procedure.impl.testonly.NeverFinishProcedure;
+
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.io.File;
+import java.util.stream.LongStream;
+
+import static org.apache.iotdb.db.utils.constant.TestConstant.BASE_OUTPUT_PATH;
+
+public class ProcedureInfoTest {
+  private static final ProcedureInfo procedureInfo = new ProcedureInfo(null); 
/**/

Review Comment:
   remove /**/



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,259 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private final AtomicLong lastProcId = new AtomicLong(-1);
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
-  private final String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
 
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+  private final Map<Long, ProcedureWAL> procWALMap = new HashMap<>();
+
+  private final ConfigManager configManager;
+
+  public ProcedureInfo(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public boolean isOldVersion() {
+    return new File(OLD_PROCEDURE_WAL_DIR).exists();
+  }
+
+  public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+    List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+    try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
       s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
                       
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
                       
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
+          .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
     } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
+      LOGGER.error("Load procedure wal failed.", e);
+    }
+    procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
+    procedureList.forEach(
+        procedure -> lastProcId.set(Math.max(lastProcId.get(), 
procedure.getProcId())));
+    try {
+      configManager.getConsensusManager().manuallyTakeSnapshot();
+    } catch (ConsensusException e) {
+      // TODO: how to handle exception
+      throw new RuntimeException(e);
     }
+    try {
+      FileUtils.recursiveDeleteFolder(OLD_PROCEDURE_WAL_DIR);
+    } catch (IOException e) {
+      LOGGER.error("Delete useless procedure wal dir fail.", e);
+      LOGGER.error(
+          "You should manually delete the procedure wal dir before ConfigNode 
restart. {}",
+          OLD_PROCEDURE_WAL_DIR);
+    }
+    LOGGER.info(
+        "The Procedure framework has been successfully upgraded. Now it uses 
the consensus layer's services instead of maintaining the WAL itself.");
+    return procedureList;
   }
 
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
+    Procedure<ConfigNodeProcedureEnv> procedure = 
updateProcedurePlan.getProcedure();
+    procedureMap.put(procedure.getProcId(), procedure);
+    long current;
+    do {
+      current = lastProcId.get();
+      if (current >= procedure.getProcId()) {
+        break;
+      }
+    } while (!lastProcId.compareAndSet(current, procedure.getProcId()));
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  @TestOnly
+  public TSStatus oldUpdateProcedure(UpdateProcedurePlan updateProcedurePlan) {
     Procedure procedure = updateProcedurePlan.getProcedure();
     long procId = procedure.getProcId();
-    Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
+    Path path = Paths.get(OLD_PROCEDURE_WAL_DIR, procId + 
PROCEDURE_WAL_SUFFIX);
     ProcedureWAL procedureWAL =
         procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, 
procedureFactory));
     try {
       procedureWAL.save(procedure);
     } catch (IOException e) {
-      LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId(), 
e);
+      LOGGER.error("Update Procedure (pid={}) wal failed", 
procedure.getProcId(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   public TSStatus deleteProcedure(DeleteProcedurePlan deleteProcedurePlan) {
-    long procId = deleteProcedurePlan.getProcId();
-    ProcedureWAL procedureWAL = procWALMap.get(procId);
-    if (procedureWAL != null) {
-      procedureWAL.delete();
-    }
-    procWALMap.remove(procId);
+    procedureMap.remove(deleteProcedurePlan.getProcId());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
+
+  private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
+    try (FileInputStream fis = new 
FileInputStream(procedureFilePath.toFile())) {
+      Procedure procedure = null;
+      try (FileChannel channel = fis.getChannel()) {
+        ByteBuffer byteBuffer = 
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
+        if (channel.read(byteBuffer) > 0) {
+          byteBuffer.flip();
+          procedure = ProcedureFactory.getInstance().create(byteBuffer);
+          byteBuffer.clear();
+        }
+        return Optional.ofNullable(procedure);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Load {} failed, it will be deleted.", procedureFilePath, 
e);
+      if (!procedureFilePath.toFile().delete()) {
+        LOGGER.error("{} delete failed; take appropriate action.", 
procedureFilePath, e);

Review Comment:
   chinglish



##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/procedure/ProcedureExecutor.java:
##########
@@ -621,8 +594,7 @@ private ProcedureLockState executeRootStackRollback(
       lockState = executeRollback(procedure);
       releaseLock(procedure, false);
 
-      boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED;
-      abortRollback |= !isRunning() || !store.isRunning();
+      boolean abortRollback = lockState != ProcedureLockState.LOCK_ACQUIRED || 
!isRunning();

Review Comment:
   do not modify



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to