OneSizeFitsQuorum commented on code in PR #12202:
URL: https://github.com/apache/iotdb/pull/12202#discussion_r1533442997


##########
iotdb-core/confignode/src/main/java/org/apache/iotdb/confignode/persistence/ProcedureInfo.java:
##########
@@ -21,79 +21,249 @@
 
 import org.apache.iotdb.common.rpc.thrift.TSStatus;
 import org.apache.iotdb.commons.conf.CommonDescriptor;
+import org.apache.iotdb.commons.snapshot.SnapshotProcessor;
+import org.apache.iotdb.commons.utils.FileUtils;
+import org.apache.iotdb.commons.utils.TestOnly;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.DeleteProcedurePlan;
 import 
org.apache.iotdb.confignode.consensus.request.write.procedure.UpdateProcedurePlan;
+import org.apache.iotdb.confignode.manager.ConfigManager;
 import org.apache.iotdb.confignode.procedure.Procedure;
+import org.apache.iotdb.confignode.procedure.env.ConfigNodeProcedureEnv;
 import org.apache.iotdb.confignode.procedure.store.ProcedureFactory;
 import org.apache.iotdb.confignode.procedure.store.ProcedureWAL;
+import org.apache.iotdb.consensus.exception.ConsensusException;
 import org.apache.iotdb.rpc.TSStatusCode;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
+import org.apache.thrift.TException;
+import org.apache.thrift.transport.TIOStreamTransport;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Stream;
 
-public class ProcedureInfo {
+public class ProcedureInfo implements SnapshotProcessor {
 
-  private static final Logger LOG = 
LoggerFactory.getLogger(ProcedureInfo.class);
+  private static final Logger LOGGER = 
LoggerFactory.getLogger(ProcedureInfo.class);
 
+  private static final String MAIN_SNAPSHOT_FILENAME = "procedure_info.bin";
+  private static final String PROCEDURE_SNAPSHOT_DIR = "procedures";
+  private static final String PROCEDURE_SNAPSHOT_FILE_SUFFIX = ".bin";
+  private static final int PROCEDURE_LOAD_BUFFER_SIZE = 8 * 1024 * 1024;
   private static final String PROCEDURE_WAL_SUFFIX = ".proc.wal";
+  private final String OLD_PROCEDURE_WAL_DIR =
+      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
+
+  private final Map<Long, Procedure<ConfigNodeProcedureEnv>> procedureMap =
+      new ConcurrentHashMap<>();
+
+  private long lastProcId = -1;
 
   private final ProcedureFactory procedureFactory = 
ProcedureFactory.getInstance();
-  private final String procedureWalDir =
-      CommonDescriptor.getInstance().getConfig().getProcedureWalFolder();
-  private final ConcurrentHashMap<Long, ProcedureWAL> procWALMap = new 
ConcurrentHashMap<>();
 
-  public void load(List<Procedure> procedureList) {
-    try (Stream<Path> s = Files.list(Paths.get(procedureWalDir))) {
+  private final ConfigManager configManager;
+
+  public ProcedureInfo(ConfigManager configManager) {
+    this.configManager = configManager;
+  }
+
+  public boolean isOldVersion() {
+    return new File(OLD_PROCEDURE_WAL_DIR).exists();
+  }
+
+  public List<Procedure<ConfigNodeProcedureEnv>> oldLoad() {
+    List<Procedure<ConfigNodeProcedureEnv>> procedureList = new ArrayList<>();
+    try (Stream<Path> s = Files.list(Paths.get(OLD_PROCEDURE_WAL_DIR))) {
       s.filter(path -> 
path.getFileName().toString().endsWith(PROCEDURE_WAL_SUFFIX))
           .sorted(
               (p1, p2) ->
                   Long.compareUnsigned(
                       
Long.parseLong(p1.getFileName().toString().split("\\.")[0]),
                       
Long.parseLong(p2.getFileName().toString().split("\\.")[0])))
-          .forEach(
-              path -> {
-                String fileName = path.getFileName().toString();
-                long procId = Long.parseLong(fileName.split("\\.")[0]);
-                ProcedureWAL procedureWAL =
-                    procWALMap.computeIfAbsent(
-                        procId, id -> new ProcedureWAL(path, 
procedureFactory));
-                procedureWAL.load(procedureList);
-              });
+          .forEach(path -> loadProcedure(path).ifPresent(procedureList::add));
+    } catch (IOException e) {
+      LOGGER.error("Load procedure wal failed.", e);
+    }
+    procedureList.forEach(procedure -> procedureMap.put(procedure.getProcId(), 
procedure));
+    procedureList.forEach(procedure -> lastProcId = Math.max(lastProcId, 
procedure.getProcId()));
+    try {
+      LOGGER.info("Old procedure files have been loaded successfully, taking 
snapshot...");
+      configManager.getConsensusManager().manuallyTakeSnapshot();
+    } catch (ConsensusException e) {
+      LOGGER.warn("Taking snapshot fail, upgrade fail", e);
+      return procedureList;
+    }
+    try {
+      FileUtils.recursiveDeleteFolder(OLD_PROCEDURE_WAL_DIR);
     } catch (IOException e) {
-      LOG.error("Load procedure wal failed.", e);
+      LOGGER.error("Delete useless procedure wal dir fail.", e);
+      LOGGER.error(
+          "You should manually delete the procedure wal dir before ConfigNode 
restart. {}",
+          OLD_PROCEDURE_WAL_DIR);
     }
+    LOGGER.info(
+        "The Procedure framework has been successfully upgraded. Now it uses 
the consensus layer's services instead of maintaining the WAL itself.");
+    return procedureList;
   }
 
   public TSStatus updateProcedure(UpdateProcedurePlan updateProcedurePlan) {
+    Procedure<ConfigNodeProcedureEnv> procedure = 
updateProcedurePlan.getProcedure();
+    procedureMap.put(procedure.getProcId(), procedure);
+    lastProcId = Math.max(lastProcId, procedure.getProcId());
+    return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
+  }
+
+  @TestOnly
+  public TSStatus oldUpdateProcedure(UpdateProcedurePlan updateProcedurePlan) {
     Procedure procedure = updateProcedurePlan.getProcedure();
     long procId = procedure.getProcId();
-    Path path = Paths.get(procedureWalDir, procId + PROCEDURE_WAL_SUFFIX);
-    ProcedureWAL procedureWAL =
-        procWALMap.computeIfAbsent(procId, id -> new ProcedureWAL(path, 
procedureFactory));
+    Path path = Paths.get(OLD_PROCEDURE_WAL_DIR, procId + 
PROCEDURE_WAL_SUFFIX);
+    ProcedureWAL procedureWAL = new ProcedureWAL(path, procedureFactory);
     try {
       procedureWAL.save(procedure);
     } catch (IOException e) {
-      LOG.error("Update Procedure (pid={}) wal failed", procedure.getProcId(), 
e);
+      LOGGER.error("Update Procedure (pid={}) wal failed", 
procedure.getProcId(), e);
       return new TSStatus(TSStatusCode.INTERNAL_SERVER_ERROR.getStatusCode());
     }
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
 
   public TSStatus deleteProcedure(DeleteProcedurePlan deleteProcedurePlan) {
-    long procId = deleteProcedurePlan.getProcId();
-    ProcedureWAL procedureWAL = procWALMap.get(procId);
-    if (procedureWAL != null) {
-      procedureWAL.delete();
-    }
-    procWALMap.remove(procId);
+    procedureMap.remove(deleteProcedurePlan.getProcId());
     return new TSStatus(TSStatusCode.SUCCESS_STATUS.getStatusCode());
   }
+
+  private static Optional<Procedure> loadProcedure(Path procedureFilePath) {
+    try (FileInputStream fis = new 
FileInputStream(procedureFilePath.toFile())) {
+      Procedure procedure = null;
+      try (FileChannel channel = fis.getChannel()) {
+        ByteBuffer byteBuffer = 
ByteBuffer.allocate(PROCEDURE_LOAD_BUFFER_SIZE);
+        if (channel.read(byteBuffer) > 0) {
+          byteBuffer.flip();
+          procedure = ProcedureFactory.getInstance().create(byteBuffer);
+          byteBuffer.clear();
+        }
+        return Optional.ofNullable(procedure);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Load {} failed, it will be deleted.", procedureFilePath, 
e);
+      if (!procedureFilePath.toFile().delete()) {
+        LOGGER.error("{} deleted failed; take appropriate action.", 
procedureFilePath, e);
+      }
+    }
+    return Optional.empty();
+  }
+
+  @Override
+  public boolean processTakeSnapshot(File snapshotDir) throws TException, 
IOException {
+    File procedureSnapshotDir = new File(snapshotDir, PROCEDURE_SNAPSHOT_DIR);
+    if (procedureSnapshotDir.exists()) {
+      LOGGER.error(
+          "Failed to take snapshot, because snapshot dir [{}] is already 
exist.",
+          procedureSnapshotDir.getAbsolutePath());
+      return false;
+    }
+    File tmpDir = new File(procedureSnapshotDir.getAbsolutePath() + "-" + 
UUID.randomUUID());
+    if (!tmpDir.mkdir()) {
+      LOGGER.error("Failed to take snapshot, because create tmp dir [{}] 
fail.", tmpDir);
+      return false;
+    }
+
+    // save lastProcId
+    File mainFile = new File(tmpDir.getAbsolutePath() + File.separator + 
MAIN_SNAPSHOT_FILENAME);
+    try (FileOutputStream fileOutputStream = new FileOutputStream(mainFile);
+        DataOutputStream dataOutputStream = new 
DataOutputStream(fileOutputStream);
+        TIOStreamTransport tioStreamTransport = new 
TIOStreamTransport(fileOutputStream)) {
+      ReadWriteIOUtils.write(lastProcId, fileOutputStream);
+      tioStreamTransport.flush();
+      fileOutputStream.getFD().sync();
+    }
+
+    // save all procedures
+    procedureMap
+        .values()
+        .forEach(
+            procedure -> {
+              try {
+                new ProcedureWAL(
+                        Paths.get(
+                            tmpDir.getAbsolutePath()
+                                + File.separator
+                                + procedure.getProcId()
+                                + PROCEDURE_SNAPSHOT_FILE_SUFFIX),
+                        procedureFactory)
+                    .save(procedure);
+              } catch (IOException e) {
+                LOGGER.warn(
+                    "{} id {} took snapshot fail", procedure.getClass(), 
procedure.getProcId(), e);

Review Comment:
   return false



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to