sonatype-lift[bot] commented on code in PR #1883:
URL: https://github.com/apache/zookeeper/pull/1883#discussion_r878545787


##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/RestoreFromBackupTool.java:
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import org.apache.commons.cli.CommandLine;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.RestoreCommand;
+import org.apache.zookeeper.common.ConfigException;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.backup.timetable.TimetableUtil;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This is a tool to restore, from backup storage, a snapshot and set of 
transaction log files
+ *  that combine to represent a transactionally consistent image of a 
ZooKeeper ensemble at
+ *  some zxid.
+ */
+public class RestoreFromBackupTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreFromBackupTool.class);
+
+  private static final int MAX_RETRIES = 10;
+  private static final String HEX_PREFIX = "0x";
+  private static final int CONNECTION_TIMEOUT = 300000;
+
+  BackupStorageProvider storage;
+  FileTxnSnapLog snapLog;
+  long zxidToRestore;
+  boolean dryRun;
+  File restoreTempDir;
+  boolean overwrite = false;
+
+  // Spot restoration
+  boolean isSpotRestoration = false;
+  String znodePathToRestore;
+  String zkServerConnectionStr;
+  boolean restoreRecursively = false;
+  ZooKeeper zk;
+  SpotRestorationTool spotRestorationTool;
+
+  List<BackupFileInfo> logs;
+  List<BackupFileInfo> snaps;
+  List<BackupFileInfo> filesToCopy;
+
+  int mostRecentLogNeededIndex;
+  int snapNeededIndex;
+  int oldestLogNeededIndex;
+
+  public enum BackupStorageOption {
+    
GPFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage"),
+    
NFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage");
+
+    private String storageProviderClassName;
+
+    BackupStorageOption(String className) {
+      this.storageProviderClassName = className;
+    }
+
+    public String getStorageProviderClassName() {
+      return storageProviderClassName;
+    }
+  }
+
+  /**
+   * Default constructor; requires using parseArgs to setup state.
+   */
+  public RestoreFromBackupTool() {
+    this(null, null, -1L, false, null);
+  }
+
+  /**
+   * Constructor
+   * @param storageProvider the backup provider from which to restore the 
backups
+   * @param snapLog the snap and log provider to use
+   * @param zxidToRestore the zxid upto which to restore, or Long.MAX to 
restore to the latest
+   *                      available transactionally consistent zxid.
+   * @param dryRun whether this is a dryrun in which case no files are 
actually copied
+   * @param restoreTempDir a temporary, local (not remote) directory to stage 
the backup files from
+   *                       remote backup storage for the processing stage of 
restoration. Note that
+   *                       this directory and the files in it will be removed 
after restoration.
+   */
+  RestoreFromBackupTool(BackupStorageProvider storageProvider, FileTxnSnapLog 
snapLog,
+      long zxidToRestore, boolean dryRun, File restoreTempDir) {
+
+    filesToCopy = new ArrayList<>();
+    snapNeededIndex = -1;
+    mostRecentLogNeededIndex = -1;
+    oldestLogNeededIndex = -1;
+
+    this.storage = storageProvider;
+    this.zxidToRestore = zxidToRestore;
+    this.snapLog = snapLog;
+    this.dryRun = dryRun;
+    this.restoreTempDir = restoreTempDir;
+  }
+
+  /**
+   * Parse and validate arguments to the tool
+   * @param cl the command line object with user inputs
+   * @return true if the arguments parse correctly; false in all other cases.
+   * @throws IOException if the backup provider cannot be instantiated 
correctly.
+   */
+  public void parseArgs(CommandLine cl) {
+    String backupStoragePath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.BACKUP_STORE);
+    createBackupStorageProvider(backupStoragePath);
+
+    // Read the restore point
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_ZXID)) {
+      parseRestoreZxid(cl);
+    } else if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP)) 
{
+      parseRestoreTimestamp(cl, backupStoragePath);
+    }
+
+    parseAndValidateSpotRestorationArgs(cl);
+
+    parseAndValidateOfflineRestoreDestination(cl);
+
+    parseRestoreTempDir(cl);
+
+    // Check if overwriting the destination directories is allowed
+    if (cl.hasOption(RestoreCommand.OptionShortForm.OVERWRITE)) {
+      overwrite = true;
+    }
+
+    // Check if this is a dry-run
+    if (cl.hasOption(RestoreCommand.OptionShortForm.DRY_RUN)) {
+      dryRun = true;
+    }
+
+    System.out.println("parseArgs successful.");
+  }
+
+  private void createBackupStorageProvider(String backupStoragePath) {
+    String[] backupStorageParams = backupStoragePath.split(":");
+    if (backupStorageParams.length != 4) {
+      System.err.println(
+          "Failed to parse backup storage connection information from the 
backup storage path provided, please check the input.");
+      System.err.println(
+          "For example: the format for a gpfs backup storage path should be 
\"gpfs:<config_path>:<backup_path>:<namespace>\".");
+      System.exit(1);
+    }
+
+    String userProvidedStorageName = backupStorageParams[0].toUpperCase();
+    try {
+      BackupStorageOption storageOption = 
BackupStorageOption.valueOf(userProvidedStorageName);
+      String backupStorageProviderClassName = 
storageOption.getStorageProviderClassName();
+
+      BackupConfig.RestorationConfigBuilder configBuilder =
+          new BackupConfig.RestorationConfigBuilder()
+              .setStorageProviderClassName(backupStorageProviderClassName)
+              
.setBackupStoragePath(backupStorageParams[2]).setNamespace(backupStorageParams[3]);
+      if (!backupStorageParams[1].isEmpty()) {
+        configBuilder = configBuilder.setStorageConfig(new 
File(backupStorageParams[1]));

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621056&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621056&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621056&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621056&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621056&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/RestoreFromBackupTool.java:
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import org.apache.commons.cli.CommandLine;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.RestoreCommand;
+import org.apache.zookeeper.common.ConfigException;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.backup.timetable.TimetableUtil;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This is a tool to restore, from backup storage, a snapshot and set of 
transaction log files
+ *  that combine to represent a transactionally consistent image of a 
ZooKeeper ensemble at
+ *  some zxid.
+ */
+public class RestoreFromBackupTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreFromBackupTool.class);
+
+  private static final int MAX_RETRIES = 10;
+  private static final String HEX_PREFIX = "0x";
+  private static final int CONNECTION_TIMEOUT = 300000;
+
+  BackupStorageProvider storage;
+  FileTxnSnapLog snapLog;
+  long zxidToRestore;
+  boolean dryRun;
+  File restoreTempDir;
+  boolean overwrite = false;
+
+  // Spot restoration
+  boolean isSpotRestoration = false;
+  String znodePathToRestore;
+  String zkServerConnectionStr;
+  boolean restoreRecursively = false;
+  ZooKeeper zk;
+  SpotRestorationTool spotRestorationTool;
+
+  List<BackupFileInfo> logs;
+  List<BackupFileInfo> snaps;
+  List<BackupFileInfo> filesToCopy;
+
+  int mostRecentLogNeededIndex;
+  int snapNeededIndex;
+  int oldestLogNeededIndex;
+
+  public enum BackupStorageOption {
+    
GPFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage"),
+    
NFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage");
+
+    private String storageProviderClassName;
+
+    BackupStorageOption(String className) {
+      this.storageProviderClassName = className;
+    }
+
+    public String getStorageProviderClassName() {
+      return storageProviderClassName;
+    }
+  }
+
+  /**
+   * Default constructor; requires using parseArgs to setup state.
+   */
+  public RestoreFromBackupTool() {
+    this(null, null, -1L, false, null);
+  }
+
+  /**
+   * Constructor
+   * @param storageProvider the backup provider from which to restore the 
backups
+   * @param snapLog the snap and log provider to use
+   * @param zxidToRestore the zxid upto which to restore, or Long.MAX to 
restore to the latest
+   *                      available transactionally consistent zxid.
+   * @param dryRun whether this is a dryrun in which case no files are 
actually copied
+   * @param restoreTempDir a temporary, local (not remote) directory to stage 
the backup files from
+   *                       remote backup storage for the processing stage of 
restoration. Note that
+   *                       this directory and the files in it will be removed 
after restoration.
+   */
+  RestoreFromBackupTool(BackupStorageProvider storageProvider, FileTxnSnapLog 
snapLog,
+      long zxidToRestore, boolean dryRun, File restoreTempDir) {
+
+    filesToCopy = new ArrayList<>();
+    snapNeededIndex = -1;
+    mostRecentLogNeededIndex = -1;
+    oldestLogNeededIndex = -1;
+
+    this.storage = storageProvider;
+    this.zxidToRestore = zxidToRestore;
+    this.snapLog = snapLog;
+    this.dryRun = dryRun;
+    this.restoreTempDir = restoreTempDir;
+  }
+
+  /**
+   * Parse and validate arguments to the tool
+   * @param cl the command line object with user inputs
+   * @return true if the arguments parse correctly; false in all other cases.
+   * @throws IOException if the backup provider cannot be instantiated 
correctly.
+   */
+  public void parseArgs(CommandLine cl) {
+    String backupStoragePath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.BACKUP_STORE);
+    createBackupStorageProvider(backupStoragePath);
+
+    // Read the restore point
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_ZXID)) {
+      parseRestoreZxid(cl);
+    } else if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP)) 
{
+      parseRestoreTimestamp(cl, backupStoragePath);
+    }
+
+    parseAndValidateSpotRestorationArgs(cl);
+
+    parseAndValidateOfflineRestoreDestination(cl);
+
+    parseRestoreTempDir(cl);
+
+    // Check if overwriting the destination directories is allowed
+    if (cl.hasOption(RestoreCommand.OptionShortForm.OVERWRITE)) {
+      overwrite = true;
+    }
+
+    // Check if this is a dry-run
+    if (cl.hasOption(RestoreCommand.OptionShortForm.DRY_RUN)) {
+      dryRun = true;
+    }
+
+    System.out.println("parseArgs successful.");
+  }
+
+  private void createBackupStorageProvider(String backupStoragePath) {
+    String[] backupStorageParams = backupStoragePath.split(":");
+    if (backupStorageParams.length != 4) {
+      System.err.println(
+          "Failed to parse backup storage connection information from the 
backup storage path provided, please check the input.");
+      System.err.println(
+          "For example: the format for a gpfs backup storage path should be 
\"gpfs:<config_path>:<backup_path>:<namespace>\".");
+      System.exit(1);
+    }
+
+    String userProvidedStorageName = backupStorageParams[0].toUpperCase();
+    try {
+      BackupStorageOption storageOption = 
BackupStorageOption.valueOf(userProvidedStorageName);
+      String backupStorageProviderClassName = 
storageOption.getStorageProviderClassName();
+
+      BackupConfig.RestorationConfigBuilder configBuilder =
+          new BackupConfig.RestorationConfigBuilder()
+              .setStorageProviderClassName(backupStorageProviderClassName)
+              
.setBackupStoragePath(backupStorageParams[2]).setNamespace(backupStorageParams[3]);
+      if (!backupStorageParams[1].isEmpty()) {
+        configBuilder = configBuilder.setStorageConfig(new 
File(backupStorageParams[1]));
+      }
+      storage = 
BackupUtil.createStorageProviderImpl(configBuilder.build().get());
+    } catch (IllegalArgumentException e) {
+      System.err.println("Could not find a valid backup storage option based 
on the input: "
+          + userProvidedStorageName + ". Error message: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } catch (ConfigException e) {
+      System.err.println(
+          "Could not generate a backup config based on the input, error 
message: " + e
+              .getMessage());
+      e.getStackTrace();
+      System.exit(1);
+    } catch (InstantiationException | InvocationTargetException | 
NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
+      System.err.println(
+          "Could not generate a backup storage provider based on the input, 
error message: " + e
+              .getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    }
+  }
+
+  private void parseRestoreZxid(CommandLine cl) {
+    String zxidToRestoreStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_ZXID);
+    if (zxidToRestoreStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+      zxidToRestore = Long.MAX_VALUE;
+    } else {
+      int base = 10;
+      String numStr = zxidToRestoreStr;
+
+      if (zxidToRestoreStr.startsWith(HEX_PREFIX)) {
+        numStr = zxidToRestoreStr.substring(2);
+        base = 16;
+      }
+      try {
+        zxidToRestore = Long.parseLong(numStr, base);
+      } catch (NumberFormatException nfe) {
+        System.err
+            .println("Invalid number specified for restore zxid point, the 
input is: " + numStr);
+        System.exit(2);
+      }
+    }
+  }
+
+  private void parseRestoreTimestamp(CommandLine cl, String backupStoragePath) 
{
+    String timestampStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP);
+    String timetableStoragePath = backupStoragePath;
+    if (cl.hasOption(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH)) {
+      timetableStoragePath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH);
+    }
+    File[] timetableFiles = new File(timetableStoragePath)
+        .listFiles(file -> 
file.getName().startsWith(TimetableBackup.TIMETABLE_PREFIX));
+    if (timetableFiles == null || timetableFiles.length == 0) {
+      System.err.println("Could not find timetable files at the path: " + 
timetableStoragePath);
+      System.exit(2);
+    }
+    Map.Entry<Long, String> restorePoint;
+    String message;
+    try {
+      restorePoint = TimetableUtil.findLastZxidFromTimestamp(timetableFiles, 
timestampStr);
+      zxidToRestore = Long.parseLong(restorePoint.getValue(), 16);
+      String timeToRestore = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
+          .format(new java.util.Date(restorePoint.getKey()));
+      if (timestampStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to latest.";
+      } else {
+        String requestedTimeToRestore = new SimpleDateFormat("MM/dd/yyyy 
HH:mm:ss")
+            .format(new java.util.Date(Long.decode(timestampStr)));
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to "
+            + requestedTimeToRestore + ".";
+      }
+      System.out.println(message);
+      LOG.info(message);
+    } catch (IllegalArgumentException | BackupException e) {
+      System.err.println(
+          "Could not find a valid zxid from timetable using the timestamp 
provided: " + timestampStr
+              + ". The error message is: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(2);
+    }
+  }
+
+  private void parseAndValidateOfflineRestoreDestination(CommandLine cl) {
+    if (isSpotRestoration) {
+      return;
+    }
+    // Read restore destination: dataDir and logDir
+    try {
+      String snapDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.SNAP_DESTINATION);
+      String logDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.LOG_DESTINATION);
+
+      if (snapDirPath == null || logDirPath == null) {
+        throw new BackupException(
+            "Snap destination path and log destination path are not defined 
for offline restoration. SnapDirPath: "
+                + snapDirPath + ", logDirPath: " + logDirPath);
+      }
+
+      File snapDir = new File(snapDirPath);
+      File logDir = new File(logDirPath);
+      snapLog = new FileTxnSnapLog(logDir, snapDir);
+      checkSnapDataDirFileExistence();
+    } catch (IOException ioe) {
+      System.err.println("Could not setup transaction log utility." + ioe);
+      System.exit(3);
+    }
+  }
+
+  private void parseRestoreTempDir(CommandLine cl) {
+    if 
(cl.hasOption(RestoreCommand.OptionShortForm.LOCAL_RESTORE_TEMP_DIR_PATH)) {
+      String localRestoreTempDirPath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.LOCAL_RESTORE_TEMP_DIR_PATH);
+      restoreTempDir = new File(localRestoreTempDirPath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621080&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621080&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621080&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621080&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621080&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/BackupStorageUtil.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.backup.storage;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.persistence.Util;
+
+/**
+ * Util methods for backup storage
+ */
+public class BackupStorageUtil {
+  public static final String TMP_FILE_PREFIX = "TMP_";
+  private static final File[] NO_FILE = new File[0];
+
+  /**
+   * Parse the prefix from a file name, also works for temporary file names in 
backup storage
+   * @param fileName The file name to be parsed
+   * @return "log" for ZK transaction log files or "snapshot" for ZK snapshots
+   */
+  public static String getFileTypePrefix(String fileName) {
+    String backupFileName = fileName;
+
+    //Remove the temporary file name prefix in order to determine file type
+    if (fileName.startsWith(TMP_FILE_PREFIX)) {
+      backupFileName = fileName.substring(TMP_FILE_PREFIX.length());
+    }
+
+    String fileTypePrefix;
+    if (backupFileName.startsWith(Util.SNAP_PREFIX)) {
+      fileTypePrefix = Util.SNAP_PREFIX;
+    } else if (backupFileName.startsWith(Util.TXLOG_PREFIX)) {
+      fileTypePrefix = Util.TXLOG_PREFIX;
+    } else {
+      throw new BackupException("No matching base file type found for file " + 
fileName);
+    }
+
+    return fileTypePrefix;
+  }
+
+  /**
+   * Construct the path of a backup file in the backup storage
+   * @param fileName The name of the file
+   * @param parentDir The path to the parent directory of the backup file.
+   * @return The path of the backup file in the format of:
+   * 1. parentDir path is not supplied: {fileName} or {fileName}
+   * 2. parentDir path is provided: {parentDir}/{fileName} or 
{parentDir}/{fileName}
+   */
+  public static String constructBackupFilePath(String fileName, String 
parentDir) {
+    //TODO: store snapshots and Txlogs in different subfolders for better 
organization
+    if (parentDir != null) {
+      return String.valueOf(Paths.get(parentDir, fileName));
+    }
+    return fileName;
+  }
+
+  /**
+   * Construct temporary file name using backup file name
+   * @param fileName A backup file name: log.lowzxid-highzxid, 
snapshot.lowzxid-highzxid
+   * @return A temporary backup file name: TMP_log.lowzxid-highzxid, 
TMP_snapshot.lowzxid-highzxid
+   */
+  public static String constructTempFileName(String fileName) {
+    return TMP_FILE_PREFIX + fileName;
+  }
+
+  /**
+   * A basic method for streaming data from an input stream to an output stream
+   * @param inputStream The stream to read from
+   * @param outputStream The stream to write to
+   * @throws IOException
+   */
+  public static void streamData(InputStream inputStream, OutputStream 
outputStream)
+      throws IOException {
+    byte[] buffer = new byte[1024];
+    int lengthRead;
+    while ((lengthRead = inputStream.read(buffer)) > 0) {
+      outputStream.write(buffer, 0, lengthRead);
+      outputStream.flush();
+    }
+  }
+
+  /**
+   * Create a new file in a specified path, create the parent directories if 
they do not exist.
+   * @param file The path to create the file.
+   * @param overwriteIfExist If a file already exists in the location,
+   *                         1. true: delete the existing file and retry the 
creation of the new file,
+   *                         or 2. false: keep the existing file.
+   * @throws IOException
+   */
+  public static void createFile(File file, boolean overwriteIfExist) throws 
IOException {
+    file.getParentFile().mkdirs();
+    if (!file.getParentFile().exists()) {
+      throw new BackupException("Failed to create parent directories for file 
" + file.getName());
+    }
+
+    boolean retry = true;
+    while (retry) {
+      retry = overwriteIfExist;
+      if (!file.createNewFile()) {
+        if (file.exists()) {
+          if (retry && !file.delete()) {
+            throw new BackupException("A file with the file path " + 
file.getPath()
+                + " already exists, and failed to be overwritten.");
+          }
+        } else {
+          throw new BackupException("Failed to create a file at path: " + 
file.getPath());
+        }
+      }
+      retry = false;
+    }
+  }
+
+  /**
+   * Get a list of all files whose file name starts with a certain prefix 
under a directory
+   * @param directory The directory to search for the files
+   * @param prefix The prefix of file name
+   * @return
+   */
+  public static File[] getFilesWithPrefix(File directory, String prefix) {
+    if (directory == null) {
+      return NO_FILE;
+    }
+    FilenameFilter fileFilter = (dir, name) -> name.startsWith(prefix);
+    File[] files = directory.listFiles(fileFilter);
+    return files == null ? NO_FILE : files;
+  }
+
+  /**
+   * Delete all the files whose file names starts with temporary file name 
prefix
+   * @param directory The directory to search for temporary files
+   * @throws IOException
+   */
+  public static void cleanUpTempFiles(File directory) throws IOException {
+    File[] tempFiles = getFilesWithPrefix(directory, TMP_FILE_PREFIX);
+    for (File tempFile : tempFiles) {
+      Files.delete(Paths.get(tempFile.getPath()));

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621054&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621054&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621054&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621054&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621054&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.BackupBean;
+import org.apache.zookeeper.server.backup.monitoring.BackupStats;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupBean;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.persistence.ZxidRange;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the backing up of txnlog and snap files to remote
+ * storage for longer term and durable retention than is possible on
+ * an ensemble server
+ */
+public class BackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
+
+  private final Logger logger;
+  private final File snapDir;
+  private final File dataLogDir;
+  private final File tmpDir;
+  private final BackupConfig backupConfig;
+  private final long backupIntervalInMilliseconds;
+  private BackupProcess logBackup = null;
+  private BackupProcess snapBackup = null;
+
+  // backupStatus, backupLogZxid and backedupSnapZxid need to be access while 
synchronized
+  // on backupStatus.
+  private final BackupStatus backupStatus;
+  private BackupPoint backupPoint;
+
+  private final BackupStorageProvider backupStorage;
+  private final long serverId;
+  private final String namespace;
+
+  @VisibleForTesting
+  protected BackupBean backupBean = null;
+  protected TimetableBackupBean timetableBackupBean = null;
+
+  private BackupStats backupStats = null;
+
+  // Optional timetable backup
+  private BackupProcess timetableBackup = null;
+
+  /**
+   * Tracks a file that needs to be backed up, including temporary copies of 
the file
+   */
+  public static class BackupFile {
+    private final File file;
+    private final boolean isTemporary;
+    private final ZxidRange zxidRange;
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param fileMinZxid the min zxid associated with this file
+     * @param fileMaxZxid the max zxid associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, long 
fileMinZxid, long fileMaxZxid) {
+      this(backupFile, isTemporaryFile, new ZxidRange(fileMinZxid, 
fileMaxZxid));
+    }
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param zxidRange the zxid range associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, ZxidRange 
zxidRange) {
+      Preconditions.checkNotNull(zxidRange);
+
+      if (!zxidRange.isHighPresent()) {
+        throw new IllegalArgumentException("ZxidRange must have a high value");
+      }
+
+      this.file = backupFile;
+      this.isTemporary = isTemporaryFile;
+      this.zxidRange = zxidRange;
+    }
+
+    /**
+     * Perform cleanup including deleting temporary files.
+     */
+    public void cleanup() {
+      if (isTemporary && exists()) {
+        file.delete();
+      }
+    }
+
+    /**
+     * Whether the file representing the zxids exists
+     * @return whether the file represented exists
+     */
+    public boolean exists() {
+      return file != null && file.exists();
+    }
+
+    /**
+     * Get the current file (topmost on the stack)
+     * @return the current file
+     */
+    public File getFile() { return file; }
+
+    /**
+     * Get the zxid range associated with this file
+     * @return the zxid range
+     */
+    public ZxidRange getZxidRange() {
+      return zxidRange;
+    }
+
+    /**
+     * Get the min zxid associated with this file
+     * @return the min associated zxid
+     */
+    public long getMinZxid() { return zxidRange.getLow(); }
+
+    /**
+     * Get the max zxid associated with this file
+     * @return the max associated zxid
+     */
+    public long getMaxZxid() { return zxidRange.getHigh(); }
+
+    @Override
+    public String toString() {
+      return String.format("%s : %s : %d - %d",
+          file == null ? "[empty]" : file.getPath(),
+          isTemporary ? "temp" : "perm",
+          zxidRange.getLow(),
+          zxidRange.getHigh());
+    }
+  }
+
+  /**
+   * Implements txnlog specific logic for BackupProcess
+   */
+  protected class TxnLogBackup extends BackupProcess {
+    private long iterationEndPoint;
+    private final FileTxnSnapLog snapLog;
+
+    /**
+     * Constructor for TxnLogBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public TxnLogBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(TxnLogBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.TXNLOG, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_LOG_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.END);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+
+      if (rZxid != backupPoint.getLogZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setLogZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    protected void startIteration() {
+      // Store the current last logged zxid.  This becomes the stopping point
+      // for the current iteration so we don't keep chasing our own tail as
+      // new transactions get written.
+      iterationEndPoint = snapLog.getLastLoggedZxid();
+      backupStats.setTxnLogBackupIterationStart();
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setTxnLogBackupIterationDone(errorFree);
+      iterationEndPoint = 0L;
+    }
+
+    /**
+     * Gets the next txnlog file to backup.  This is a temporary file created 
by copying
+     * all transaction from the previous backup point until the end zxid for 
this iteration, or
+     * a file indicating that some log records were lost.
+     * @return the file that needs to be backed-up.  The minZxid is the first
+     *      zxid contained in the file.  The maxZxid is the last zxid that is 
contained in the
+     *      file.
+     * @throws IOException
+     */
+    protected BackupFile getNextFileToBackup() throws IOException {
+      long startingZxid = backupStatus.read().getLogZxid() + 1;
+
+      // Don't keep chasing the tail so stop if past the last zxid at the time
+      // this iteration started.
+      if (startingZxid > iterationEndPoint) {
+        return null;
+      }
+
+      TxnLog.TxnIterator iter = null;
+      FileTxnLog newFile = null;
+      long lastZxid = -1;
+      int txnCopied = 0;
+      BackupFile ret;
+
+      logger.info("Creating backup file from zxid {}.", 
ZxidUtils.zxidToString(startingZxid));
+
+      try {
+        iter = snapLog.readTxnLog(startingZxid, true);
+
+        // Use a temp directory to avoid conflicts with live txnlog files
+        newFile = new FileTxnLog(tmpDir);
+
+        // TODO: Ideally, we should have have a way to prevent lost TxLogs
+        // Check for lost txnlogs; <=1 indicates that no backups have been 
done before so
+        // nothing can be considered lost.
+        // If a lost sequence is found then return a file whose name encodes 
the lost
+        // sequence and back that up so the backup store has a record of the 
lost sequence
+        if (startingZxid > 1 &&
+            iter.getHeader() != null &&
+            iter.getHeader().getZxid() > startingZxid) {
+
+          logger.error("TxnLog backups lost.  Required starting zxid={}  First 
available zxid={}",
+              ZxidUtils.zxidToString(startingZxid),
+              ZxidUtils.zxidToString(iter.getHeader().getZxid()));
+
+          String fileName = String.format("%s.%s",
+              BackupUtil.LOST_LOG_PREFIX,
+              Long.toHexString(startingZxid));
+          File lostZxidFile = new File(tmpDir, fileName);
+          lostZxidFile.createNewFile();
+
+          return new BackupFile(lostZxidFile, true, startingZxid, 
iter.getHeader().getZxid() - 1);
+        }
+
+        while (iter.getHeader() != null) {
+          TxnHeader hdr = iter.getHeader();
+
+          if (hdr.getZxid() > iterationEndPoint) {
+            break;
+          }
+
+          newFile.append(hdr, iter.getTxn());
+
+          // update position and count only AFTER the record has been 
successfully
+          // copied
+          lastZxid = hdr.getZxid();
+          txnCopied++;
+
+          iter.next();
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+
+        if (ret != null) {
+          logger.info("Copied {} records starting at {} and ending at zxid 
{}.",
+              txnCopied,
+              ZxidUtils.zxidToString(ret.getMinZxid()),
+              ZxidUtils.zxidToString(ret.getMaxZxid()));
+        }
+
+      } catch (IOException e) {
+        logger.warn("Hit exception after {} records.  Exception: {} ", 
txnCopied, e.fillInStackTrace());
+
+        // If any records were copied return those and ignore the error.  
Otherwise
+        // rethrow the error to be handled by the caller as a failed backup 
iteration.
+        if (txnCopied <= 0) {
+          throw e;
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+
+        if (newFile != null) {
+          newFile.close();
+        }
+      }
+
+      return ret;
+    }
+
+    private BackupFile makeBackupFileFromCopiedLog(FileTxnLog backupTxnLog, 
long lastZxid) {
+
+      if (backupTxnLog == null) {
+        return null;
+      }
+
+      // The logFile gets initialized with the first transaction's zxid
+      File logFile = backupTxnLog.getCurrentFile();
+
+      if (logFile == null) {
+        return null;
+      }
+
+      long firstZxid = Util.getZxidFromName(logFile.getName(), 
Util.TXLOG_PREFIX);
+
+      if (lastZxid == -1) {
+        lastZxid = firstZxid;
+      }
+
+      return new BackupFile(logFile, true, new ZxidRange(firstZxid, lastZxid));
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setLogZxid(file.getMaxZxid());
+        backupStatus.update(backupPoint);
+      }
+
+      logger.info("Updated backedup tnxlog zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+    }
+  }
+
+  /**
+   * Implements snapshot specific logic for BackupProcess
+   */
+  protected class SnapBackup extends BackupProcess {
+    private final FileTxnSnapLog snapLog;
+    private final List<BackupFile> filesToBackup = new ArrayList<>();
+
+    /**
+     * Constructor for SnapBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public SnapBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(SnapBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.SNAPSHOT, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_SNAP_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.START);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getSnapZxid()));
+
+      if (rZxid != backupPoint.getSnapZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setSnapZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    /**
+     * Prepares snapshot files to back up and populate filesToBackup.
+     * Note that this implementation persists all snapshots instead of only 
persisting the
+     * latest snapshot.
+     * @throws IOException
+     */
+    protected void startIteration() throws IOException {
+      backupStats.setSnapshotBackupIterationStart();
+      filesToBackup.clear();
+
+      // Get all available snapshots excluding the ones whose 
lastProcessedZxid falls into the
+      // zxid range [0, backedupSnapZxid]
+      List<File> candidateSnapshots = snapLog.findValidSnapshots(0, 
backupPoint.getSnapZxid());
+      // Sort candidateSnapshots from oldest to newest
+      Collections.reverse(candidateSnapshots);
+
+      if (candidateSnapshots.size() == 0) {
+        // Either no snapshots or no newer snapshots to back up, so return
+        return;
+      }
+
+      for (int i = 0; i < candidateSnapshots.size(); i++) {
+        File f = candidateSnapshots.get(i);
+        ZxidRange zxidRange = Util.getZxidRangeFromName(f.getName(), 
Util.SNAP_PREFIX);
+
+        if (i == candidateSnapshots.size() - 1) {
+          // This is the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // Use the last logged zxid for the zxidRange for the latest 
snapshot as a best effort
+            // approach
+            // TODO: Because this is the best effort approach, the zxidRange 
will not be accurate
+            // TODO: Consider rewriting these latest snapshots to backup 
storage if necessary
+            // TODO: when we know the high zxid when we get a newer snapshot
+            long latestZxid = snapLog.getLastLoggedZxid();
+            long consistentAt = latestZxid == -1 ? zxidRange.getLow() : 
latestZxid;
+            zxidRange = new ZxidRange(zxidRange.getLow(), consistentAt);
+          }
+        } else {
+          // All newer snapshots that are not the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // ZxidRange will be [low, high] where high will be the zxid right 
before the next
+            // snapshot's lastProcessedZxid
+            long nextSnapshotStartZxid =
+                Util.getZxidFromName(candidateSnapshots.get(i + 1).getName(), 
Util.SNAP_PREFIX);
+            zxidRange = new ZxidRange(zxidRange.getLow(), 
nextSnapshotStartZxid - 1);
+          }
+        }
+
+        filesToBackup.add(new BackupFile(f, false, zxidRange));
+      }
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setSnapshotBackupIterationDone(errorFree);
+      filesToBackup.clear();
+    }
+
+    protected BackupFile getNextFileToBackup() {
+      if (filesToBackup.isEmpty()) {
+        return null;
+      }
+
+      return filesToBackup.remove(0);
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setSnapZxid(file.getMinZxid());
+        backupStatus.update(backupPoint);
+      }
+      backupStats.incrementNumberOfSnapshotFilesBackedUpThisIteration();
+
+      logger.info("Updated backedup snap zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getSnapZxid()));
+    }
+  }
+
+  /**
+   * Constructor for the BackupManager.
+   * @param snapDir the snapshot directory
+   * @param dataLogDir the txnlog directory
+   * @param serverId the id of the zk server
+   * @param backupConfig the backup config object
+   * @throws IOException
+   */
+  public BackupManager(File snapDir, File dataLogDir, long serverId, 
BackupConfig backupConfig)
+      throws IOException {
+    logger = LoggerFactory.getLogger(BackupManager.class);
+    logger.info("snapDir={}", snapDir.getPath());
+    logger.info("dataLogDir={}", dataLogDir.getPath());
+    logger.info("backupStatusDir={}", backupConfig.getStatusDir().getPath());
+    logger.info("tmpDir={}", backupConfig.getTmpDir().getPath());
+    logger.info("backupIntervalInMinutes={}", 
backupConfig.getBackupIntervalInMinutes());
+    logger.info("serverId={}", serverId);
+    logger.info("namespace={}", backupConfig.getNamespace());
+
+    this.snapDir = snapDir;
+    this.dataLogDir = dataLogDir;
+    this.backupConfig = backupConfig;
+    // Note: tmpDir is namespaced
+    this.tmpDir = new File(String.join(File.separator, 
backupConfig.getTmpDir().getAbsolutePath(),

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621058&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621058&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621058&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621058&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621058&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621066&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621066&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621066&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621066&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621066&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.BackupBean;
+import org.apache.zookeeper.server.backup.monitoring.BackupStats;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupBean;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.persistence.ZxidRange;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the backing up of txnlog and snap files to remote
+ * storage for longer term and durable retention than is possible on
+ * an ensemble server
+ */
+public class BackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
+
+  private final Logger logger;
+  private final File snapDir;
+  private final File dataLogDir;
+  private final File tmpDir;
+  private final BackupConfig backupConfig;
+  private final long backupIntervalInMilliseconds;
+  private BackupProcess logBackup = null;
+  private BackupProcess snapBackup = null;
+
+  // backupStatus, backupLogZxid and backedupSnapZxid need to be access while 
synchronized
+  // on backupStatus.
+  private final BackupStatus backupStatus;
+  private BackupPoint backupPoint;
+
+  private final BackupStorageProvider backupStorage;
+  private final long serverId;
+  private final String namespace;
+
+  @VisibleForTesting
+  protected BackupBean backupBean = null;
+  protected TimetableBackupBean timetableBackupBean = null;
+
+  private BackupStats backupStats = null;
+
+  // Optional timetable backup
+  private BackupProcess timetableBackup = null;
+
+  /**
+   * Tracks a file that needs to be backed up, including temporary copies of 
the file
+   */
+  public static class BackupFile {
+    private final File file;
+    private final boolean isTemporary;
+    private final ZxidRange zxidRange;
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param fileMinZxid the min zxid associated with this file
+     * @param fileMaxZxid the max zxid associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, long 
fileMinZxid, long fileMaxZxid) {
+      this(backupFile, isTemporaryFile, new ZxidRange(fileMinZxid, 
fileMaxZxid));
+    }
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param zxidRange the zxid range associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, ZxidRange 
zxidRange) {
+      Preconditions.checkNotNull(zxidRange);
+
+      if (!zxidRange.isHighPresent()) {
+        throw new IllegalArgumentException("ZxidRange must have a high value");
+      }
+
+      this.file = backupFile;
+      this.isTemporary = isTemporaryFile;
+      this.zxidRange = zxidRange;
+    }
+
+    /**
+     * Perform cleanup including deleting temporary files.
+     */
+    public void cleanup() {
+      if (isTemporary && exists()) {
+        file.delete();
+      }
+    }
+
+    /**
+     * Whether the file representing the zxids exists
+     * @return whether the file represented exists
+     */
+    public boolean exists() {
+      return file != null && file.exists();
+    }
+
+    /**
+     * Get the current file (topmost on the stack)
+     * @return the current file
+     */
+    public File getFile() { return file; }
+
+    /**
+     * Get the zxid range associated with this file
+     * @return the zxid range
+     */
+    public ZxidRange getZxidRange() {
+      return zxidRange;
+    }
+
+    /**
+     * Get the min zxid associated with this file
+     * @return the min associated zxid
+     */
+    public long getMinZxid() { return zxidRange.getLow(); }
+
+    /**
+     * Get the max zxid associated with this file
+     * @return the max associated zxid
+     */
+    public long getMaxZxid() { return zxidRange.getHigh(); }
+
+    @Override
+    public String toString() {
+      return String.format("%s : %s : %d - %d",
+          file == null ? "[empty]" : file.getPath(),
+          isTemporary ? "temp" : "perm",
+          zxidRange.getLow(),
+          zxidRange.getHigh());
+    }
+  }
+
+  /**
+   * Implements txnlog specific logic for BackupProcess
+   */
+  protected class TxnLogBackup extends BackupProcess {
+    private long iterationEndPoint;
+    private final FileTxnSnapLog snapLog;
+
+    /**
+     * Constructor for TxnLogBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public TxnLogBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(TxnLogBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.TXNLOG, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_LOG_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.END);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+
+      if (rZxid != backupPoint.getLogZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setLogZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    protected void startIteration() {
+      // Store the current last logged zxid.  This becomes the stopping point
+      // for the current iteration so we don't keep chasing our own tail as
+      // new transactions get written.
+      iterationEndPoint = snapLog.getLastLoggedZxid();
+      backupStats.setTxnLogBackupIterationStart();
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setTxnLogBackupIterationDone(errorFree);
+      iterationEndPoint = 0L;
+    }
+
+    /**
+     * Gets the next txnlog file to backup.  This is a temporary file created 
by copying
+     * all transaction from the previous backup point until the end zxid for 
this iteration, or
+     * a file indicating that some log records were lost.
+     * @return the file that needs to be backed-up.  The minZxid is the first
+     *      zxid contained in the file.  The maxZxid is the last zxid that is 
contained in the
+     *      file.
+     * @throws IOException
+     */
+    protected BackupFile getNextFileToBackup() throws IOException {
+      long startingZxid = backupStatus.read().getLogZxid() + 1;
+
+      // Don't keep chasing the tail so stop if past the last zxid at the time
+      // this iteration started.
+      if (startingZxid > iterationEndPoint) {
+        return null;
+      }
+
+      TxnLog.TxnIterator iter = null;
+      FileTxnLog newFile = null;
+      long lastZxid = -1;
+      int txnCopied = 0;
+      BackupFile ret;
+
+      logger.info("Creating backup file from zxid {}.", 
ZxidUtils.zxidToString(startingZxid));
+
+      try {
+        iter = snapLog.readTxnLog(startingZxid, true);
+
+        // Use a temp directory to avoid conflicts with live txnlog files
+        newFile = new FileTxnLog(tmpDir);
+
+        // TODO: Ideally, we should have have a way to prevent lost TxLogs
+        // Check for lost txnlogs; <=1 indicates that no backups have been 
done before so
+        // nothing can be considered lost.
+        // If a lost sequence is found then return a file whose name encodes 
the lost
+        // sequence and back that up so the backup store has a record of the 
lost sequence
+        if (startingZxid > 1 &&
+            iter.getHeader() != null &&
+            iter.getHeader().getZxid() > startingZxid) {
+
+          logger.error("TxnLog backups lost.  Required starting zxid={}  First 
available zxid={}",
+              ZxidUtils.zxidToString(startingZxid),
+              ZxidUtils.zxidToString(iter.getHeader().getZxid()));
+
+          String fileName = String.format("%s.%s",
+              BackupUtil.LOST_LOG_PREFIX,
+              Long.toHexString(startingZxid));
+          File lostZxidFile = new File(tmpDir, fileName);
+          lostZxidFile.createNewFile();
+
+          return new BackupFile(lostZxidFile, true, startingZxid, 
iter.getHeader().getZxid() - 1);
+        }
+
+        while (iter.getHeader() != null) {
+          TxnHeader hdr = iter.getHeader();
+
+          if (hdr.getZxid() > iterationEndPoint) {
+            break;
+          }
+
+          newFile.append(hdr, iter.getTxn());
+
+          // update position and count only AFTER the record has been 
successfully
+          // copied
+          lastZxid = hdr.getZxid();
+          txnCopied++;
+
+          iter.next();
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+
+        if (ret != null) {
+          logger.info("Copied {} records starting at {} and ending at zxid 
{}.",
+              txnCopied,
+              ZxidUtils.zxidToString(ret.getMinZxid()),
+              ZxidUtils.zxidToString(ret.getMaxZxid()));
+        }
+
+      } catch (IOException e) {
+        logger.warn("Hit exception after {} records.  Exception: {} ", 
txnCopied, e.fillInStackTrace());
+
+        // If any records were copied return those and ignore the error.  
Otherwise
+        // rethrow the error to be handled by the caller as a failed backup 
iteration.
+        if (txnCopied <= 0) {
+          throw e;
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+
+        if (newFile != null) {
+          newFile.close();
+        }
+      }
+
+      return ret;
+    }
+
+    private BackupFile makeBackupFileFromCopiedLog(FileTxnLog backupTxnLog, 
long lastZxid) {
+
+      if (backupTxnLog == null) {
+        return null;
+      }
+
+      // The logFile gets initialized with the first transaction's zxid
+      File logFile = backupTxnLog.getCurrentFile();
+
+      if (logFile == null) {
+        return null;
+      }
+
+      long firstZxid = Util.getZxidFromName(logFile.getName(), 
Util.TXLOG_PREFIX);
+
+      if (lastZxid == -1) {
+        lastZxid = firstZxid;
+      }
+
+      return new BackupFile(logFile, true, new ZxidRange(firstZxid, lastZxid));
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setLogZxid(file.getMaxZxid());
+        backupStatus.update(backupPoint);
+      }
+
+      logger.info("Updated backedup tnxlog zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+    }
+  }
+
+  /**
+   * Implements snapshot specific logic for BackupProcess
+   */
+  protected class SnapBackup extends BackupProcess {
+    private final FileTxnSnapLog snapLog;
+    private final List<BackupFile> filesToBackup = new ArrayList<>();
+
+    /**
+     * Constructor for SnapBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public SnapBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(SnapBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.SNAPSHOT, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_SNAP_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.START);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getSnapZxid()));
+
+      if (rZxid != backupPoint.getSnapZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setSnapZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    /**
+     * Prepares snapshot files to back up and populate filesToBackup.
+     * Note that this implementation persists all snapshots instead of only 
persisting the
+     * latest snapshot.
+     * @throws IOException
+     */
+    protected void startIteration() throws IOException {
+      backupStats.setSnapshotBackupIterationStart();
+      filesToBackup.clear();
+
+      // Get all available snapshots excluding the ones whose 
lastProcessedZxid falls into the
+      // zxid range [0, backedupSnapZxid]
+      List<File> candidateSnapshots = snapLog.findValidSnapshots(0, 
backupPoint.getSnapZxid());
+      // Sort candidateSnapshots from oldest to newest
+      Collections.reverse(candidateSnapshots);
+
+      if (candidateSnapshots.size() == 0) {
+        // Either no snapshots or no newer snapshots to back up, so return
+        return;
+      }
+
+      for (int i = 0; i < candidateSnapshots.size(); i++) {
+        File f = candidateSnapshots.get(i);
+        ZxidRange zxidRange = Util.getZxidRangeFromName(f.getName(), 
Util.SNAP_PREFIX);
+
+        if (i == candidateSnapshots.size() - 1) {
+          // This is the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // Use the last logged zxid for the zxidRange for the latest 
snapshot as a best effort
+            // approach
+            // TODO: Because this is the best effort approach, the zxidRange 
will not be accurate
+            // TODO: Consider rewriting these latest snapshots to backup 
storage if necessary
+            // TODO: when we know the high zxid when we get a newer snapshot
+            long latestZxid = snapLog.getLastLoggedZxid();
+            long consistentAt = latestZxid == -1 ? zxidRange.getLow() : 
latestZxid;
+            zxidRange = new ZxidRange(zxidRange.getLow(), consistentAt);
+          }
+        } else {
+          // All newer snapshots that are not the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // ZxidRange will be [low, high] where high will be the zxid right 
before the next
+            // snapshot's lastProcessedZxid
+            long nextSnapshotStartZxid =
+                Util.getZxidFromName(candidateSnapshots.get(i + 1).getName(), 
Util.SNAP_PREFIX);
+            zxidRange = new ZxidRange(zxidRange.getLow(), 
nextSnapshotStartZxid - 1);
+          }
+        }
+
+        filesToBackup.add(new BackupFile(f, false, zxidRange));
+      }
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setSnapshotBackupIterationDone(errorFree);
+      filesToBackup.clear();
+    }
+
+    protected BackupFile getNextFileToBackup() {
+      if (filesToBackup.isEmpty()) {
+        return null;
+      }
+
+      return filesToBackup.remove(0);
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setSnapZxid(file.getMinZxid());
+        backupStatus.update(backupPoint);
+      }
+      backupStats.incrementNumberOfSnapshotFilesBackedUpThisIteration();
+
+      logger.info("Updated backedup snap zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getSnapZxid()));
+    }
+  }
+
+  /**
+   * Constructor for the BackupManager.
+   * @param snapDir the snapshot directory
+   * @param dataLogDir the txnlog directory
+   * @param serverId the id of the zk server
+   * @param backupConfig the backup config object
+   * @throws IOException
+   */
+  public BackupManager(File snapDir, File dataLogDir, long serverId, 
BackupConfig backupConfig)
+      throws IOException {
+    logger = LoggerFactory.getLogger(BackupManager.class);
+    logger.info("snapDir={}", snapDir.getPath());
+    logger.info("dataLogDir={}", dataLogDir.getPath());
+    logger.info("backupStatusDir={}", backupConfig.getStatusDir().getPath());
+    logger.info("tmpDir={}", backupConfig.getTmpDir().getPath());
+    logger.info("backupIntervalInMinutes={}", 
backupConfig.getBackupIntervalInMinutes());
+    logger.info("serverId={}", serverId);
+    logger.info("namespace={}", backupConfig.getNamespace());
+
+    this.snapDir = snapDir;
+    this.dataLogDir = dataLogDir;
+    this.backupConfig = backupConfig;
+    // Note: tmpDir is namespaced
+    this.tmpDir = new File(String.join(File.separator, 
backupConfig.getTmpDir().getAbsolutePath(),
+        backupConfig.getNamespace()));
+    // Note: statusDir is namespaced
+    this.backupStatus = new BackupStatus(new File(String
+        .join(File.separator, backupConfig.getStatusDir().getAbsolutePath(),

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621077&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621077&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621077&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621077&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621077&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupFileInfo.java:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+
+import com.google.common.collect.Range;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.SnapStream;
+import org.apache.zookeeper.server.persistence.Util;
+
+/**
+ * Metadata for a file that has been backed-up
+ * Assumes that the name of the backed up file uses the format:
+ * prefix.lowzxid-highzxid where prefix is one of the standard snap or log 
file prefixes, or
+ * "lostLog".
+ * In the case of timetable backup file, it takes a format of 
timetable.lowTimestamp-highTimestamp.
+ */
+public class BackupFileInfo {
+  public static final long NOT_SET = -1L;
+  private static final String DASH_DELIMITER = "-";
+
+  private final File backupFile;
+  private final File standardFile;
+  private final Range<Long> range;
+  private final BackupFileType fileType;
+  private final long modificationTime;
+  private final long size;
+
+  /**
+   * Constructor that pulls backup metadata based on the backed-up filename
+   * @param backedupFile the backed-up file with the name in the form 
prefix.lowzxid-highzxid
+   *                     for example snapshot.9a0000a344-9a0000b012.
+   *                     if timetable backup, ranges are decimal longs (posix 
timestamps)
+   * @param modificationTime the file modification time
+   * @param size the size of the file in bytes
+   */
+  public BackupFileInfo(File backedupFile, long modificationTime, long size) {
+    this.backupFile = backedupFile;
+    this.modificationTime = modificationTime;
+    this.size = size;
+
+    String backedupFilename = this.backupFile.getName();
+
+    if (backedupFilename.startsWith(BackupUtil.LOST_LOG_PREFIX)) {
+      this.fileType = BackupFileType.LOSTLOG;
+      this.standardFile = this.backupFile;
+    } else if (backedupFilename.startsWith(Util.SNAP_PREFIX)) {
+      this.fileType = BackupFileType.SNAPSHOT;
+      // For snapshot backup files, we need to consider the case of snapshot 
compression
+      String streamMode = SnapStream.getStreamMode(backedupFilename).getName();
+      String standardFileName;
+      if (streamMode.isEmpty()) {
+        // No compression was used, so simply drop the end part
+        standardFileName = backedupFilename.split(DASH_DELIMITER)[0];
+      } else {
+        // Snapshot compression is enabled; standardName looks like 
"snapshot.<zxid>.<streamMode>"
+        // Need to remove the ending zxid
+        String[] nameParts = backedupFilename.split("\\.");
+        if (nameParts.length != 3) {
+          throw new BackupException(
+              "BackupFileInfo: unable to create standardFile reference! 
backedupFilename: "
+                  + backedupFilename + " StreamMode: " + streamMode);
+        }
+        String zxidPartWithoutEnd = nameParts[1].split(DASH_DELIMITER)[0];
+        // Combine all parts to generate a backup name
+        standardFileName = nameParts[0] + "." + zxidPartWithoutEnd + "." + 
nameParts[2];
+      }
+      this.standardFile = new File(this.backupFile.getParentFile(), 
standardFileName);
+    } else if (backedupFilename.startsWith(Util.TXLOG_PREFIX)) {
+      this.fileType = BackupFileType.TXNLOG;
+      this.standardFile =
+          new File(this.backupFile.getParentFile(), 
backedupFilename.split(DASH_DELIMITER)[0]);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a 
file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621094&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621094&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621094&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621094&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621094&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621099&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621099&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621099&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621099&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621099&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupProcess.java:
##########
@@ -0,0 +1,175 @@
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.slf4j.Logger;
+
+/**
+ * Base class for the txnlog and snap backup, and timetable backup processes.
+ * Provides the main backup loop and copying to remote storage (via HDFS, NFS, 
etc. APIs)
+ */
+public abstract class BackupProcess implements Runnable {
+  protected final Logger logger;
+  protected final BackupStorageProvider backupStorage;
+  private final long backupIntervalInMilliseconds;
+  protected volatile boolean isRunning = true;
+
+  /**
+   * Initialize starting backup point based on remote storage and backupStatus 
file
+   */
+  protected abstract void initialize() throws IOException;
+
+  /**
+   * Marks the start of a backup iteration.  A backup iteration is run every
+   * backup.interval.  This is called at the start of the iteration and before
+   * any calls to getNextFileToBackup
+   * @throws IOException
+   */
+  protected abstract void startIteration() throws IOException;
+
+  /**
+   * Marks the end of a backup iteration.  After this call there will be no 
more
+   * calls to getNextFileToBackup or backupComplete until startIteration is
+   * called again.
+   * @param errorFree whether the iteration was error free
+   * @throws IOException
+   */
+  protected abstract void endIteration(boolean errorFree);
+
+  /**
+   * Get the next file to backup
+   * @return the next file to copy to backup storage.
+   * @throws IOException
+   */
+  protected abstract BackupManager.BackupFile getNextFileToBackup() throws 
IOException;
+
+  /**
+   * Marks that the copy of the specified file to backup storage has completed
+   * @param file the file to backup
+   * @throws IOException
+   */
+  protected abstract void backupComplete(BackupManager.BackupFile file) throws 
IOException;
+
+  /**
+   * Create an instance of the backup process
+   * @param logger the logger to use for this process.
+   */
+  public BackupProcess(Logger logger, BackupStorageProvider backupStorage,
+      long backupIntervalInMilliseconds) {
+    if (logger == null) {
+      throw new IllegalArgumentException("BackupProcess: logger is null!");
+    }
+
+    this.logger = logger;
+    this.backupStorage = backupStorage;
+    this.backupIntervalInMilliseconds = backupIntervalInMilliseconds;
+  }
+
+  /**
+   * Runs the main file based backup loop indefinitely.
+   */
+  public void run() {
+    run(0);
+  }
+
+  /**
+   * Runs the main file based backup loop the specified number of time.
+   * Calls methods implemented by derived classes to get the next file to copy.
+   */
+  public void run(int iterations) {
+    try {
+      boolean errorFree = true;
+      logger.debug("Thread starting.");
+
+      while (isRunning) {
+        BackupManager.BackupFile fileToCopy;
+        long startTime = System.currentTimeMillis();
+
+        try {
+          if (logger.isDebugEnabled()) {
+            logger.debug("Starting iteration");
+          }
+
+          // Cleanup any invalid backups that may have been left behind by the
+          // previous failed iteration.
+          // NOTE: Not done on first iteration (errorFree initialized to true) 
since
+          //       initialize already does this.
+          if (!errorFree) {
+            backupStorage.cleanupInvalidFiles(null);
+          }
+
+          startIteration();
+
+          while ((fileToCopy = getNextFileToBackup()) != null) {
+            // Consider: compress file before sending to remote storage
+            copyToRemoteStorage(fileToCopy);
+            backupComplete(fileToCopy);
+            fileToCopy.cleanup();
+          }
+
+          errorFree = true;
+        } catch (IOException e) {
+          errorFree = false;
+          logger.warn("Exception hit during backup", e);
+        }
+
+        endIteration(errorFree);
+        long elapsedTime = System.currentTimeMillis() - startTime;
+        logger.info("Completed backup iteration in {} milliseconds.  
ErrorFree: {}.",
+            elapsedTime, errorFree);
+
+        if (iterations != 0) {
+          iterations--;
+
+          if (iterations < 1) {
+            break;
+          }
+        }
+
+        // Count elapsed time towards the backup interval
+        long waitTime = backupIntervalInMilliseconds - elapsedTime;
+
+        synchronized (this) {  // synchronized to get notification of 
termination
+          if (waitTime > 0) {
+            wait(waitTime);
+          }
+        }
+      }
+    } catch (InterruptedException e) {
+      logger.warn("Interrupted exception while waiting for backup interval.", 
e.fillInStackTrace());
+    } catch (Exception e) {
+      logger.error("Hit unexpected exception", e.fillInStackTrace());
+    }
+
+    logger.warn("BackupProcess thread exited loop!");
+  }
+
+  /**
+   * Copy given file to remote storage via Backup Storage (HDFS, NFS, etc.) 
APIs.
+   * @param fileToCopy the file to copy
+   * @throws IOException
+   */
+  private void copyToRemoteStorage(BackupManager.BackupFile fileToCopy) throws 
IOException {
+    if (fileToCopy.getFile() == null) {
+      return;
+    }
+
+    // Use the file name to encode the max included zxid
+    String backupName = BackupUtil.makeBackupName(
+        fileToCopy.getFile().getName(), fileToCopy.getMaxZxid());
+
+    backupStorage.copyToBackupStorage(fileToCopy.getFile(), new 
File(backupName));

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621126&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621126&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621126&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621126&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621126&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/timetable/TimetableBackup.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.timetable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.BackupManager;
+import org.apache.zookeeper.server.backup.BackupPoint;
+import org.apache.zookeeper.server.backup.BackupProcess;
+import org.apache.zookeeper.server.backup.BackupStatus;
+import org.apache.zookeeper.server.backup.BackupUtil;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements Timetable-specific logic for BackupProcess.
+ *
+ * Backup timetable encodes data of the format <timestamp>:<zxid>. This is 
used to locate the
+ * closest zxid backup point given the timestamp. The use case is for users 
who wish to restore
+ * from backup at a specific time recorded in the backup timetable.
+ */
+public class TimetableBackup extends BackupProcess {
+  public static final String TIMETABLE_PREFIX = "timetable";
+  // Use an ordered map of <timestamp>:<zxid>
+  private final TreeMap<Long, String> timetableRecordMap = new TreeMap<>();
+  private final File tmpDir;
+  // Lock is used to keep access to timetableRecordMap exclusive
+  private final Lock lock = new ReentrantLock(true);
+
+  // Candidate files to be backed up each iteration sorted by name (using 
SortedSet)
+  private final SortedSet<BackupManager.BackupFile> 
candidateTimetableBackupFiles =
+      new TreeSet<>(Comparator.comparing(o -> o.getFile().getName()));
+  // BackupStatus is used here to keep track of timetable backup status in 
case of a crash/restart
+  // BackupStatus is written to a file. After a crash (e.g. JVM crash) or a 
restart, the
+  // locally-stored zkBackupStatus file will be read back to restore a 
BackupPoint
+  private final BackupStatus backupStatus;
+  private final BackupPoint backupPoint;
+  private final TimetableBackupStats backupStats; // Metrics
+
+  /**
+   * Create an instance of TimetableBackup.
+   * @param snapLog
+   * @param tmpDir
+   * @param backupStorageProvider
+   * @param backupIntervalInMilliseconds
+   * @param timetableBackupIntervalInMs
+   * @param backupStatus
+   */
+  public TimetableBackup(FileTxnSnapLog snapLog, File tmpDir,
+      BackupStorageProvider backupStorageProvider, long 
backupIntervalInMilliseconds,
+      long timetableBackupIntervalInMs, BackupStatus backupStatus, BackupPoint 
backupPoint,
+      TimetableBackupStats backupStats) {
+    super(LoggerFactory.getLogger(TimetableBackup.class), 
backupStorageProvider,
+        backupIntervalInMilliseconds);
+    this.tmpDir = tmpDir;
+    this.backupStatus = backupStatus;
+    this.backupPoint = backupPoint;
+    this.backupStats = backupStats;
+    // Start creating records
+    (new Thread(new TimetableRecorder(snapLog, 
timetableBackupIntervalInMs))).start();
+    logger.info("TimetableBackup::Starting TimetableBackup Process with backup 
interval: "
+        + backupIntervalInMilliseconds + " ms and timetable backup interval: "
+        + timetableBackupIntervalInMs + " ms.");
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    // Get the latest timetable backup file from backup storage
+    BackupFileInfo latest = BackupUtil.getLatest(backupStorage, 
BackupUtil.BackupFileType.TIMETABLE,
+        BackupUtil.IntervalEndpoint.END);
+
+    long latestTimestampBackedUp = latest == null ? 
BackupUtil.INVALID_TIMESTAMP
+        : latest.getIntervalEndpoint(BackupUtil.IntervalEndpoint.END);
+
+    logger.info(
+        "TimetableBackup::initialize(): latest timestamp from storage: {}, 
from BackupStatus: {}",
+        latestTimestampBackedUp, backupPoint.getTimestamp());
+
+    if (latestTimestampBackedUp != backupPoint.getTimestamp()) {
+      synchronized (backupStatus) {
+        backupPoint.setTimestamp(latestTimestampBackedUp);
+        backupStatus.update(backupPoint);
+      }
+    }
+  }
+
+  @Override
+  protected void startIteration() throws IOException {
+    backupStats.setTimetableBackupIterationStart();
+
+    // It's possible that the last iteration failed and left some tmp backup 
files behind - add
+    // them back to candidate backup files so the I/O write will happen again
+    getAllTmpBackupFiles();
+
+    // Create a timetable backup file from the cached records 
(timetableRecordMap)
+    lock.lock();
+    try {
+      if (!timetableRecordMap.isEmpty()) {
+        // Create a temp timetable backup file
+        // Make the backup file temporary so that it will be deleted after the 
iteration provided
+        // the I/O write to the backup storage succeeds. Otherwise, this 
temporary backup file will
+        // continue to exist in tmpDir locally until it's successfully written 
to the backup storage
+        BackupManager.BackupFile timetableBackupFromThisIteration =
+            new BackupManager.BackupFile(createTimetableBackupFile(), true, 0, 
0);
+        candidateTimetableBackupFiles.add(timetableBackupFromThisIteration);
+        timetableRecordMap.clear(); // Clear the map
+      }
+    } catch (Exception e) {
+      logger.error("TimetableBackup::startIteration(): failed to create 
timetable file to back up!",
+          e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  protected void endIteration(boolean errorFree) {
+    // timetableRecordMap is cleared in startIteration() already, so we do not 
clear it here
+    backupStats.setTimetableBackupIterationDone(errorFree);
+  }
+
+  @Override
+  protected BackupManager.BackupFile getNextFileToBackup() throws IOException {
+    BackupManager.BackupFile nextFile = null;
+    if (!candidateTimetableBackupFiles.isEmpty()) {
+      nextFile = candidateTimetableBackupFiles.first();
+    }
+    return nextFile;
+  }
+
+  @Override
+  protected void backupComplete(BackupManager.BackupFile file) throws 
IOException {
+    // Remove from the candidate set because it has been copied to the backup 
storage successfully
+    candidateTimetableBackupFiles.remove(file);
+    synchronized (backupStatus) {
+      // Update the latest timestamp to which the timetable backup was 
successful
+      
backupPoint.setTimestamp(Long.parseLong(file.getFile().getName().split("-")[1]));
+      backupStatus.update(backupPoint);
+    }
+    logger.info(
+        "TimetableBackup::backupComplete(): backup complete for file: {}. 
Updated backed up "
+            + "timestamp to {}", file.getFile().getName(), 
backupPoint.getTimestamp());
+  }
+
+  /**
+   * Create a file name for timetable backup.
+   * E.g.) timetable.<lowest timestamp>-<highest timestamp>
+   * @return
+   */
+  private String makeTimetableBackupFileName() {
+    return String.format("%s.%d-%d", TIMETABLE_PREFIX, 
timetableRecordMap.firstKey(),
+        timetableRecordMap.lastKey());
+  }
+
+  /**
+   * Write the content of timetableRecordMap to a file using a tmpDir.
+   * @return
+   * @throws IOException
+   */
+  private File createTimetableBackupFile() throws IOException {
+    File tempTimetableBackupFile = new File(tmpDir, 
makeTimetableBackupFileName());

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a 
file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621124&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621124&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621124&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621124&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621124&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupFileInfo.java:
##########
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+
+import com.google.common.collect.Range;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.SnapStream;
+import org.apache.zookeeper.server.persistence.Util;
+
+/**
+ * Metadata for a file that has been backed-up
+ * Assumes that the name of the backed up file uses the format:
+ * prefix.lowzxid-highzxid where prefix is one of the standard snap or log 
file prefixes, or
+ * "lostLog".
+ * In the case of timetable backup file, it takes a format of 
timetable.lowTimestamp-highTimestamp.
+ */
+public class BackupFileInfo {
+  public static final long NOT_SET = -1L;
+  private static final String DASH_DELIMITER = "-";
+
+  private final File backupFile;
+  private final File standardFile;
+  private final Range<Long> range;
+  private final BackupFileType fileType;
+  private final long modificationTime;
+  private final long size;
+
+  /**
+   * Constructor that pulls backup metadata based on the backed-up filename
+   * @param backedupFile the backed-up file with the name in the form 
prefix.lowzxid-highzxid
+   *                     for example snapshot.9a0000a344-9a0000b012.
+   *                     if timetable backup, ranges are decimal longs (posix 
timestamps)
+   * @param modificationTime the file modification time
+   * @param size the size of the file in bytes
+   */
+  public BackupFileInfo(File backedupFile, long modificationTime, long size) {
+    this.backupFile = backedupFile;
+    this.modificationTime = modificationTime;
+    this.size = size;
+
+    String backedupFilename = this.backupFile.getName();
+
+    if (backedupFilename.startsWith(BackupUtil.LOST_LOG_PREFIX)) {
+      this.fileType = BackupFileType.LOSTLOG;
+      this.standardFile = this.backupFile;
+    } else if (backedupFilename.startsWith(Util.SNAP_PREFIX)) {
+      this.fileType = BackupFileType.SNAPSHOT;
+      // For snapshot backup files, we need to consider the case of snapshot 
compression
+      String streamMode = SnapStream.getStreamMode(backedupFilename).getName();
+      String standardFileName;
+      if (streamMode.isEmpty()) {
+        // No compression was used, so simply drop the end part
+        standardFileName = backedupFilename.split(DASH_DELIMITER)[0];
+      } else {
+        // Snapshot compression is enabled; standardName looks like 
"snapshot.<zxid>.<streamMode>"
+        // Need to remove the ending zxid
+        String[] nameParts = backedupFilename.split("\\.");
+        if (nameParts.length != 3) {
+          throw new BackupException(
+              "BackupFileInfo: unable to create standardFile reference! 
backedupFilename: "
+                  + backedupFilename + " StreamMode: " + streamMode);
+        }
+        String zxidPartWithoutEnd = nameParts[1].split(DASH_DELIMITER)[0];
+        // Combine all parts to generate a backup name
+        standardFileName = nameParts[0] + "." + zxidPartWithoutEnd + "." + 
nameParts[2];
+      }
+      this.standardFile = new File(this.backupFile.getParentFile(), 
standardFileName);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a 
file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621128&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621128&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621128&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621128&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621128&lift_comment_rating=5)
 ]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to