sonatype-lift[bot] commented on code in PR #1883:
URL: https://github.com/apache/zookeeper/pull/1883#discussion_r878545647


##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      throw new BackupException(
+          "Backup directory " + filePath + " does not exist, could not get 
directory list.");
+    }
+
+    // Filter out all the files which are directories
+    FilenameFilter fileFilter = (dir, name) -> new File(dir, 
name).isDirectory();
+    File[] dirs = backupDir.listFiles(fileFilter);
+
+    if (dirs == null) {
+      return new ArrayList<>();
+    }
+    return Arrays.asList(dirs);
+  }
+
+  @Override
+  public InputStream open(File path) throws IOException {
+    if (!path.exists() || path.isDirectory()) {
+      throw new BackupException("The file with the file path " + path
+          + " does not exist or is a directory, could not open the file.");
+    }
+    return new FileInputStream(path);
+  }
+
+  @Override
+  public void copyToBackupStorage(File srcFile, File destName) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+    String backupTempFilePath;
+    File backupTempFile;
+
+    sharedLock.lock();
+    try {
+      inputStream = open(srcFile);
+
+      backupTempFilePath = BackupStorageUtil
+          
.constructBackupFilePath(BackupStorageUtil.constructTempFileName(destName.getName()),
+              fileRootPath);
+      backupTempFile = new File(backupTempFilePath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226620986&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620986&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620986&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620986&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226620986&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/RestoreFromBackupTool.java:
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import org.apache.commons.cli.CommandLine;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.RestoreCommand;
+import org.apache.zookeeper.common.ConfigException;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.backup.timetable.TimetableUtil;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This is a tool to restore, from backup storage, a snapshot and set of 
transaction log files
+ *  that combine to represent a transactionally consistent image of a 
ZooKeeper ensemble at
+ *  some zxid.
+ */
+public class RestoreFromBackupTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreFromBackupTool.class);
+
+  private static final int MAX_RETRIES = 10;
+  private static final String HEX_PREFIX = "0x";
+  private static final int CONNECTION_TIMEOUT = 300000;
+
+  BackupStorageProvider storage;
+  FileTxnSnapLog snapLog;
+  long zxidToRestore;
+  boolean dryRun;
+  File restoreTempDir;
+  boolean overwrite = false;
+
+  // Spot restoration
+  boolean isSpotRestoration = false;
+  String znodePathToRestore;
+  String zkServerConnectionStr;
+  boolean restoreRecursively = false;
+  ZooKeeper zk;
+  SpotRestorationTool spotRestorationTool;
+
+  List<BackupFileInfo> logs;
+  List<BackupFileInfo> snaps;
+  List<BackupFileInfo> filesToCopy;
+
+  int mostRecentLogNeededIndex;
+  int snapNeededIndex;
+  int oldestLogNeededIndex;
+
+  public enum BackupStorageOption {
+    
GPFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage"),
+    
NFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage");
+
+    private String storageProviderClassName;
+
+    BackupStorageOption(String className) {
+      this.storageProviderClassName = className;
+    }
+
+    public String getStorageProviderClassName() {
+      return storageProviderClassName;
+    }
+  }
+
+  /**
+   * Default constructor; requires using parseArgs to setup state.
+   */
+  public RestoreFromBackupTool() {
+    this(null, null, -1L, false, null);
+  }
+
+  /**
+   * Constructor
+   * @param storageProvider the backup provider from which to restore the 
backups
+   * @param snapLog the snap and log provider to use
+   * @param zxidToRestore the zxid upto which to restore, or Long.MAX to 
restore to the latest
+   *                      available transactionally consistent zxid.
+   * @param dryRun whether this is a dryrun in which case no files are 
actually copied
+   * @param restoreTempDir a temporary, local (not remote) directory to stage 
the backup files from
+   *                       remote backup storage for the processing stage of 
restoration. Note that
+   *                       this directory and the files in it will be removed 
after restoration.
+   */
+  RestoreFromBackupTool(BackupStorageProvider storageProvider, FileTxnSnapLog 
snapLog,
+      long zxidToRestore, boolean dryRun, File restoreTempDir) {
+
+    filesToCopy = new ArrayList<>();
+    snapNeededIndex = -1;
+    mostRecentLogNeededIndex = -1;
+    oldestLogNeededIndex = -1;
+
+    this.storage = storageProvider;
+    this.zxidToRestore = zxidToRestore;
+    this.snapLog = snapLog;
+    this.dryRun = dryRun;
+    this.restoreTempDir = restoreTempDir;
+  }
+
+  /**
+   * Parse and validate arguments to the tool
+   * @param cl the command line object with user inputs
+   * @return true if the arguments parse correctly; false in all other cases.
+   * @throws IOException if the backup provider cannot be instantiated 
correctly.
+   */
+  public void parseArgs(CommandLine cl) {
+    String backupStoragePath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.BACKUP_STORE);
+    createBackupStorageProvider(backupStoragePath);
+
+    // Read the restore point
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_ZXID)) {
+      parseRestoreZxid(cl);
+    } else if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP)) 
{
+      parseRestoreTimestamp(cl, backupStoragePath);
+    }
+
+    parseAndValidateSpotRestorationArgs(cl);
+
+    parseAndValidateOfflineRestoreDestination(cl);
+
+    parseRestoreTempDir(cl);
+
+    // Check if overwriting the destination directories is allowed
+    if (cl.hasOption(RestoreCommand.OptionShortForm.OVERWRITE)) {
+      overwrite = true;
+    }
+
+    // Check if this is a dry-run
+    if (cl.hasOption(RestoreCommand.OptionShortForm.DRY_RUN)) {
+      dryRun = true;
+    }
+
+    System.out.println("parseArgs successful.");
+  }
+
+  private void createBackupStorageProvider(String backupStoragePath) {
+    String[] backupStorageParams = backupStoragePath.split(":");
+    if (backupStorageParams.length != 4) {
+      System.err.println(
+          "Failed to parse backup storage connection information from the 
backup storage path provided, please check the input.");
+      System.err.println(
+          "For example: the format for a gpfs backup storage path should be 
\"gpfs:<config_path>:<backup_path>:<namespace>\".");
+      System.exit(1);
+    }
+
+    String userProvidedStorageName = backupStorageParams[0].toUpperCase();
+    try {
+      BackupStorageOption storageOption = 
BackupStorageOption.valueOf(userProvidedStorageName);
+      String backupStorageProviderClassName = 
storageOption.getStorageProviderClassName();
+
+      BackupConfig.RestorationConfigBuilder configBuilder =
+          new BackupConfig.RestorationConfigBuilder()
+              .setStorageProviderClassName(backupStorageProviderClassName)
+              
.setBackupStoragePath(backupStorageParams[2]).setNamespace(backupStorageParams[3]);
+      if (!backupStorageParams[1].isEmpty()) {
+        configBuilder = configBuilder.setStorageConfig(new 
File(backupStorageParams[1]));
+      }
+      storage = 
BackupUtil.createStorageProviderImpl(configBuilder.build().get());
+    } catch (IllegalArgumentException e) {
+      System.err.println("Could not find a valid backup storage option based 
on the input: "
+          + userProvidedStorageName + ". Error message: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } catch (ConfigException e) {
+      System.err.println(
+          "Could not generate a backup config based on the input, error 
message: " + e
+              .getMessage());
+      e.getStackTrace();
+      System.exit(1);
+    } catch (InstantiationException | InvocationTargetException | 
NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
+      System.err.println(
+          "Could not generate a backup storage provider based on the input, 
error message: " + e
+              .getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    }
+  }
+
+  private void parseRestoreZxid(CommandLine cl) {
+    String zxidToRestoreStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_ZXID);
+    if (zxidToRestoreStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+      zxidToRestore = Long.MAX_VALUE;
+    } else {
+      int base = 10;
+      String numStr = zxidToRestoreStr;
+
+      if (zxidToRestoreStr.startsWith(HEX_PREFIX)) {
+        numStr = zxidToRestoreStr.substring(2);
+        base = 16;
+      }
+      try {
+        zxidToRestore = Long.parseLong(numStr, base);
+      } catch (NumberFormatException nfe) {
+        System.err
+            .println("Invalid number specified for restore zxid point, the 
input is: " + numStr);
+        System.exit(2);
+      }
+    }
+  }
+
+  private void parseRestoreTimestamp(CommandLine cl, String backupStoragePath) 
{
+    String timestampStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP);
+    String timetableStoragePath = backupStoragePath;
+    if (cl.hasOption(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH)) {
+      timetableStoragePath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH);
+    }
+    File[] timetableFiles = new File(timetableStoragePath)
+        .listFiles(file -> 
file.getName().startsWith(TimetableBackup.TIMETABLE_PREFIX));
+    if (timetableFiles == null || timetableFiles.length == 0) {
+      System.err.println("Could not find timetable files at the path: " + 
timetableStoragePath);
+      System.exit(2);
+    }
+    Map.Entry<Long, String> restorePoint;
+    String message;
+    try {
+      restorePoint = TimetableUtil.findLastZxidFromTimestamp(timetableFiles, 
timestampStr);
+      zxidToRestore = Long.parseLong(restorePoint.getValue(), 16);
+      String timeToRestore = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
+          .format(new java.util.Date(restorePoint.getKey()));
+      if (timestampStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to latest.";
+      } else {
+        String requestedTimeToRestore = new SimpleDateFormat("MM/dd/yyyy 
HH:mm:ss")
+            .format(new java.util.Date(Long.decode(timestampStr)));
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to "
+            + requestedTimeToRestore + ".";
+      }
+      System.out.println(message);
+      LOG.info(message);
+    } catch (IllegalArgumentException | BackupException e) {
+      System.err.println(
+          "Could not find a valid zxid from timetable using the timestamp 
provided: " + timestampStr
+              + ". The error message is: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(2);
+    }
+  }
+
+  private void parseAndValidateOfflineRestoreDestination(CommandLine cl) {
+    if (isSpotRestoration) {
+      return;
+    }
+    // Read restore destination: dataDir and logDir
+    try {
+      String snapDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.SNAP_DESTINATION);
+      String logDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.LOG_DESTINATION);
+
+      if (snapDirPath == null || logDirPath == null) {
+        throw new BackupException(
+            "Snap destination path and log destination path are not defined 
for offline restoration. SnapDirPath: "
+                + snapDirPath + ", logDirPath: " + logDirPath);
+      }
+
+      File snapDir = new File(snapDirPath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226620998&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620998&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620998&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620998&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226620998&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/RestoreFromBackupTool.java:
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import org.apache.commons.cli.CommandLine;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.RestoreCommand;
+import org.apache.zookeeper.common.ConfigException;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.backup.timetable.TimetableUtil;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This is a tool to restore, from backup storage, a snapshot and set of 
transaction log files
+ *  that combine to represent a transactionally consistent image of a 
ZooKeeper ensemble at
+ *  some zxid.
+ */
+public class RestoreFromBackupTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreFromBackupTool.class);
+
+  private static final int MAX_RETRIES = 10;
+  private static final String HEX_PREFIX = "0x";
+  private static final int CONNECTION_TIMEOUT = 300000;
+
+  BackupStorageProvider storage;
+  FileTxnSnapLog snapLog;
+  long zxidToRestore;
+  boolean dryRun;
+  File restoreTempDir;
+  boolean overwrite = false;
+
+  // Spot restoration
+  boolean isSpotRestoration = false;
+  String znodePathToRestore;
+  String zkServerConnectionStr;
+  boolean restoreRecursively = false;
+  ZooKeeper zk;
+  SpotRestorationTool spotRestorationTool;
+
+  List<BackupFileInfo> logs;
+  List<BackupFileInfo> snaps;
+  List<BackupFileInfo> filesToCopy;
+
+  int mostRecentLogNeededIndex;
+  int snapNeededIndex;
+  int oldestLogNeededIndex;
+
+  public enum BackupStorageOption {
+    
GPFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage"),
+    
NFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage");
+
+    private String storageProviderClassName;
+
+    BackupStorageOption(String className) {
+      this.storageProviderClassName = className;
+    }
+
+    public String getStorageProviderClassName() {
+      return storageProviderClassName;
+    }
+  }
+
+  /**
+   * Default constructor; requires using parseArgs to setup state.
+   */
+  public RestoreFromBackupTool() {
+    this(null, null, -1L, false, null);
+  }
+
+  /**
+   * Constructor
+   * @param storageProvider the backup provider from which to restore the 
backups
+   * @param snapLog the snap and log provider to use
+   * @param zxidToRestore the zxid upto which to restore, or Long.MAX to 
restore to the latest
+   *                      available transactionally consistent zxid.
+   * @param dryRun whether this is a dryrun in which case no files are 
actually copied
+   * @param restoreTempDir a temporary, local (not remote) directory to stage 
the backup files from
+   *                       remote backup storage for the processing stage of 
restoration. Note that
+   *                       this directory and the files in it will be removed 
after restoration.
+   */
+  RestoreFromBackupTool(BackupStorageProvider storageProvider, FileTxnSnapLog 
snapLog,
+      long zxidToRestore, boolean dryRun, File restoreTempDir) {
+
+    filesToCopy = new ArrayList<>();
+    snapNeededIndex = -1;
+    mostRecentLogNeededIndex = -1;
+    oldestLogNeededIndex = -1;
+
+    this.storage = storageProvider;
+    this.zxidToRestore = zxidToRestore;
+    this.snapLog = snapLog;
+    this.dryRun = dryRun;
+    this.restoreTempDir = restoreTempDir;
+  }
+
+  /**
+   * Parse and validate arguments to the tool
+   * @param cl the command line object with user inputs
+   * @return true if the arguments parse correctly; false in all other cases.
+   * @throws IOException if the backup provider cannot be instantiated 
correctly.
+   */
+  public void parseArgs(CommandLine cl) {
+    String backupStoragePath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.BACKUP_STORE);
+    createBackupStorageProvider(backupStoragePath);
+
+    // Read the restore point
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_ZXID)) {
+      parseRestoreZxid(cl);
+    } else if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP)) 
{
+      parseRestoreTimestamp(cl, backupStoragePath);
+    }
+
+    parseAndValidateSpotRestorationArgs(cl);
+
+    parseAndValidateOfflineRestoreDestination(cl);
+
+    parseRestoreTempDir(cl);
+
+    // Check if overwriting the destination directories is allowed
+    if (cl.hasOption(RestoreCommand.OptionShortForm.OVERWRITE)) {
+      overwrite = true;
+    }
+
+    // Check if this is a dry-run
+    if (cl.hasOption(RestoreCommand.OptionShortForm.DRY_RUN)) {
+      dryRun = true;
+    }
+
+    System.out.println("parseArgs successful.");
+  }
+
+  private void createBackupStorageProvider(String backupStoragePath) {
+    String[] backupStorageParams = backupStoragePath.split(":");
+    if (backupStorageParams.length != 4) {
+      System.err.println(
+          "Failed to parse backup storage connection information from the 
backup storage path provided, please check the input.");
+      System.err.println(
+          "For example: the format for a gpfs backup storage path should be 
\"gpfs:<config_path>:<backup_path>:<namespace>\".");
+      System.exit(1);
+    }
+
+    String userProvidedStorageName = backupStorageParams[0].toUpperCase();
+    try {
+      BackupStorageOption storageOption = 
BackupStorageOption.valueOf(userProvidedStorageName);
+      String backupStorageProviderClassName = 
storageOption.getStorageProviderClassName();
+
+      BackupConfig.RestorationConfigBuilder configBuilder =
+          new BackupConfig.RestorationConfigBuilder()
+              .setStorageProviderClassName(backupStorageProviderClassName)
+              
.setBackupStoragePath(backupStorageParams[2]).setNamespace(backupStorageParams[3]);
+      if (!backupStorageParams[1].isEmpty()) {
+        configBuilder = configBuilder.setStorageConfig(new 
File(backupStorageParams[1]));
+      }
+      storage = 
BackupUtil.createStorageProviderImpl(configBuilder.build().get());
+    } catch (IllegalArgumentException e) {
+      System.err.println("Could not find a valid backup storage option based 
on the input: "
+          + userProvidedStorageName + ". Error message: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } catch (ConfigException e) {
+      System.err.println(
+          "Could not generate a backup config based on the input, error 
message: " + e
+              .getMessage());
+      e.getStackTrace();
+      System.exit(1);
+    } catch (InstantiationException | InvocationTargetException | 
NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
+      System.err.println(
+          "Could not generate a backup storage provider based on the input, 
error message: " + e
+              .getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    }
+  }
+
+  private void parseRestoreZxid(CommandLine cl) {
+    String zxidToRestoreStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_ZXID);
+    if (zxidToRestoreStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+      zxidToRestore = Long.MAX_VALUE;
+    } else {
+      int base = 10;
+      String numStr = zxidToRestoreStr;
+
+      if (zxidToRestoreStr.startsWith(HEX_PREFIX)) {
+        numStr = zxidToRestoreStr.substring(2);
+        base = 16;
+      }
+      try {
+        zxidToRestore = Long.parseLong(numStr, base);
+      } catch (NumberFormatException nfe) {
+        System.err
+            .println("Invalid number specified for restore zxid point, the 
input is: " + numStr);
+        System.exit(2);
+      }
+    }
+  }
+
+  private void parseRestoreTimestamp(CommandLine cl, String backupStoragePath) 
{
+    String timestampStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP);
+    String timetableStoragePath = backupStoragePath;
+    if (cl.hasOption(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH)) {
+      timetableStoragePath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH);
+    }
+    File[] timetableFiles = new File(timetableStoragePath)
+        .listFiles(file -> 
file.getName().startsWith(TimetableBackup.TIMETABLE_PREFIX));
+    if (timetableFiles == null || timetableFiles.length == 0) {
+      System.err.println("Could not find timetable files at the path: " + 
timetableStoragePath);
+      System.exit(2);
+    }
+    Map.Entry<Long, String> restorePoint;
+    String message;
+    try {
+      restorePoint = TimetableUtil.findLastZxidFromTimestamp(timetableFiles, 
timestampStr);
+      zxidToRestore = Long.parseLong(restorePoint.getValue(), 16);
+      String timeToRestore = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
+          .format(new java.util.Date(restorePoint.getKey()));
+      if (timestampStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to latest.";
+      } else {
+        String requestedTimeToRestore = new SimpleDateFormat("MM/dd/yyyy 
HH:mm:ss")
+            .format(new java.util.Date(Long.decode(timestampStr)));
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to "
+            + requestedTimeToRestore + ".";
+      }
+      System.out.println(message);
+      LOG.info(message);
+    } catch (IllegalArgumentException | BackupException e) {
+      System.err.println(
+          "Could not find a valid zxid from timetable using the timestamp 
provided: " + timestampStr
+              + ". The error message is: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(2);
+    }
+  }
+
+  private void parseAndValidateOfflineRestoreDestination(CommandLine cl) {
+    if (isSpotRestoration) {
+      return;
+    }
+    // Read restore destination: dataDir and logDir
+    try {
+      String snapDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.SNAP_DESTINATION);
+      String logDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.LOG_DESTINATION);
+
+      if (snapDirPath == null || logDirPath == null) {
+        throw new BackupException(
+            "Snap destination path and log destination path are not defined 
for offline restoration. SnapDirPath: "
+                + snapDirPath + ", logDirPath: " + logDirPath);
+      }
+
+      File snapDir = new File(snapDirPath);
+      File logDir = new File(logDirPath);
+      snapLog = new FileTxnSnapLog(logDir, snapDir);
+      checkSnapDataDirFileExistence();
+    } catch (IOException ioe) {
+      System.err.println("Could not setup transaction log utility." + ioe);
+      System.exit(3);
+    }
+  }
+
+  private void parseRestoreTempDir(CommandLine cl) {
+    if 
(cl.hasOption(RestoreCommand.OptionShortForm.LOCAL_RESTORE_TEMP_DIR_PATH)) {
+      String localRestoreTempDirPath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.LOCAL_RESTORE_TEMP_DIR_PATH);
+      restoreTempDir = new File(localRestoreTempDirPath);
+    }
+
+    if (restoreTempDir == null) {
+      if (isSpotRestoration) {
+        throw new BackupException(
+            "Local restore temp dir path is not defined for spot 
restoration.");
+      } else {
+        // This is an offline restoration
+        // If the user hasn't provided the restore temp dir parameter,
+        //then the tool will just create a temporary folder inside snapLog and 
delete it afterwards.
+        this.restoreTempDir = new File(snapLog.getDataDir(), "RestoreTempDir_" 
+ zxidToRestore);
+      }
+    }
+  }
+
+  private void parseAndValidateSpotRestorationArgs(CommandLine cl) {
+    if (cl.hasOption(RestoreCommand.OptionShortForm.ZNODE_PATH_TO_RESTORE)) {
+      znodePathToRestore = 
cl.getOptionValue(RestoreCommand.OptionShortForm.ZNODE_PATH_TO_RESTORE);
+    }
+    if 
(cl.hasOption(RestoreCommand.OptionShortForm.ZK_SERVER_CONNECTION_STRING)) {
+      zkServerConnectionStr =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.ZK_SERVER_CONNECTION_STRING);
+    }
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RECURSIVE_SPOT_RESTORE)) {
+      restoreRecursively = true;
+    }
+    if (znodePathToRestore != null && zkServerConnectionStr != null) {
+      isSpotRestoration = true;
+    } else if (znodePathToRestore == null && zkServerConnectionStr == null) {
+      isSpotRestoration = false;
+    } else {
+      throw new BackupException(
+          "Znode path and zk server connection string must be provided in 
order to do spot restoration. Provided znode path: "
+              + znodePathToRestore + ", provided zk server connection string: "
+              + zkServerConnectionStr);
+    }
+  }
+
+  /**
+   * Attempts to perform a restore with up to MAX_RETRIES retries.
+   * @return true if the restore completed successfully, false in all other 
cases.
+   */
+  public boolean runWithRetries(CommandLine cl) {
+    parseArgs(cl);
+
+    int tries = 0;
+
+    if (dryRun) {
+      System.out.println("This is a DRYRUN, no files will actually be 
copied.");
+    }
+
+    while (tries < MAX_RETRIES) {
+      try {
+        run();
+        return true;
+      } catch (IllegalArgumentException re) {
+        System.err.println(
+            "Restore attempt failed, could not find all the required backup 
files to restore. "
+                + "Error message: " + re.getMessage());
+        re.printStackTrace();
+        return false;
+      } catch (BackupException be) {
+        System.err.println(
+            "Restoration attempt failed due to a backup exception, it's 
usually caused by required "
+                + "directories not existing or failure of creating 
directories, etc. Please check the message. "
+                + "Error message: " + be.getMessage());
+        be.printStackTrace();
+        return false;
+      } catch (Exception e) {
+        tries++;
+        System.err.println("Restore attempt failed; attempting again. " + 
tries + "/" + MAX_RETRIES
+            + ". Error message: " + e.getMessage());
+        e.printStackTrace();
+      }
+    }
+
+    System.err.println("Failed to restore after " + (tries + 1) + " 
attempts.");
+    return false;
+  }
+
+  /**
+   * Attempts to perform a restore.
+   */
+  public void run() throws IOException, InterruptedException {
+    try {
+      if (!findFilesToRestore()) {
+        throw new IllegalArgumentException("Failed to find valid snapshot and 
logs to restore.");
+      }
+
+      if (restoreTempDir == null || storage == null) {
+        throw new BackupException("The RestoreTempDir and 
BackupStorageProvider cannot be null.");
+      }
+
+      if (!restoreTempDir.exists() && !restoreTempDir.mkdirs()) {
+        throw new BackupException(
+            "Failed to create a temporary directory at path: " + 
restoreTempDir.getPath()
+                + " to store copied backup files.");
+      }
+
+      if (!dryRun) {
+        // This step will create a "version-2" directory inside restoreTempDir,
+        // all the selected backup files will be copied to version-2 directory
+        FileTxnSnapLog restoreTempSnapLog =
+            new FileTxnSnapLog(this.restoreTempDir, this.restoreTempDir);
+
+        copyBackupFilesToLocalTempDir(restoreTempSnapLog);
+        processCopiedBackupFiles(restoreTempSnapLog, zxidToRestore);
+        if (isSpotRestoration) {
+          performSpotRestoration(restoreTempDir);
+        } else {
+          // It is an offline restoration
+          copyProcessedRestoredFilesToDestination(restoreTempSnapLog);
+        }
+      }
+    } finally {
+      if (restoreTempDir != null && restoreTempDir.exists()) {
+        // Using recursive delete here because a "version-2" directory is 
created under restoreTempDir with FileTxnSnapLog
+        BackupStorageUtil.deleteDirectoryRecursively(restoreTempDir);
+      }
+    }
+  }
+
+  /**
+   * Finds the set of files (snapshot and txlog) that are required to restore 
to a transactionally
+   * consistent point for the requested zxid.
+   * Note that when restoring to the latest zxid, the transactionally 
consistent point may NOT
+   * be the latest backed up zxid if logs have been lost in between; or if 
there is no
+   * transactionally consistent point in which nothing will be restored but 
the restored will
+   * be considered successful.
+   * @return true if a transactionally consistent set of files could be found 
for the requested
+   *         restore point; false in all other cases.
+   * @throws IOException
+   */
+  private boolean findFilesToRestore() throws IOException {
+    snaps = BackupUtil.getBackupFiles(storage, BackupFileType.SNAPSHOT, 
IntervalEndpoint.START,
+        BackupUtil.SortOrder.DESCENDING);
+    logs = BackupUtil.getBackupFiles(storage,
+        new BackupFileType[]{BackupFileType.TXNLOG, BackupFileType.LOSTLOG}, 
IntervalEndpoint.START,
+        BackupUtil.SortOrder.DESCENDING);
+    filesToCopy = new ArrayList<>();
+
+    snapNeededIndex = -1;
+
+    while (findNextPossibleSnapshot()) {
+      if (findLogRange()) {
+        filesToCopy.add(snaps.get(snapNeededIndex));
+        filesToCopy.addAll(logs.subList(mostRecentLogNeededIndex, 
oldestLogNeededIndex + 1));
+        return true;
+      }
+
+      if (zxidToRestore != Long.MAX_VALUE) {
+        break;
+      }
+    }
+
+    // Restoring to an empty data tree (i.e. no backup files) is valid for 
restoring to
+    // latest
+    if (zxidToRestore == Long.MAX_VALUE) {
+      return true;
+    }
+
+    return false;
+  }
+
+  /**
+   * Find the next snapshot whose range is below the requested restore point.
+   * Note: in practice this only gets called once when zxidToRestore != 
Long.MAX
+   * @return true if a snapshot is found; false in all other cases.
+   */
+  private boolean findNextPossibleSnapshot() {
+    for (snapNeededIndex++; snapNeededIndex < snaps.size(); snapNeededIndex++) 
{
+      if (snaps.get(snapNeededIndex).getRange().upperEndpoint() <= 
zxidToRestore) {
+        return true;
+      }
+    }
+
+    return false;
+  }
+
+  /**
+   * Find the range of log files needed to make the current proposed snapshot 
transactionally
+   * consistent to the requested zxid.
+   * When zxidToRestore == Long.MAX the transaction log range terminates 
either on the most
+   * recent backedup txnlog OR the last txnlog prior to a lost log range 
(assuming that log
+   * still makes the snapshot transactionally consistent).
+   * @return true if a valid log range is found; false in all other cases.
+   */
+  private boolean findLogRange() {
+    Preconditions.checkState(snapNeededIndex >= 0 && snapNeededIndex < 
snaps.size());
+
+    if (logs.size() == 0) {
+      return false;
+    }
+
+    BackupFileInfo snap = snaps.get(snapNeededIndex);
+    Range<Long> snapRange = snap.getRange();
+    oldestLogNeededIndex = logs.size() - 1;
+    mostRecentLogNeededIndex = 0;
+
+    // Find the first txlog that might make the snapshot valid, OR is a lost 
log
+    for (int i = 0; i < logs.size() - 1; i++) {
+      BackupFileInfo log = logs.get(i);
+      Range<Long> logRange = log.getRange();
+
+      if (logRange.lowerEndpoint() <= snapRange.lowerEndpoint()) {
+        oldestLogNeededIndex = i;
+        break;
+      }
+    }
+
+    // Starting if the oldest log that might allow the snapshot to be valid, 
find the txnlog
+    // that includes the restore point, OR is a lost log
+    for (int i = oldestLogNeededIndex; i > 0; i--) {
+      BackupFileInfo log = logs.get(i);
+      Range<Long> logRange = log.getRange();
+
+      if (log.getFileType() == BackupFileType.LOSTLOG
+          || logRange.upperEndpoint() >= zxidToRestore) {
+
+        mostRecentLogNeededIndex = i;
+        break;
+      }
+    }
+
+    return validateLogRange();
+  }
+
+  private boolean validateLogRange() {
+    Preconditions.checkState(oldestLogNeededIndex >= 0);
+    Preconditions.checkState(oldestLogNeededIndex < logs.size());
+    Preconditions.checkState(mostRecentLogNeededIndex >= 0);
+    Preconditions.checkState(mostRecentLogNeededIndex < logs.size());
+    Preconditions.checkState(oldestLogNeededIndex >= mostRecentLogNeededIndex);
+    Preconditions.checkState(snapNeededIndex >= 0);
+    Preconditions.checkState(snapNeededIndex < snaps.size());
+
+    BackupFileInfo snap = snaps.get(snapNeededIndex);
+    BackupFileInfo oldestLog = logs.get(oldestLogNeededIndex);
+    BackupFileInfo newestLog = logs.get(mostRecentLogNeededIndex);
+
+    if (oldestLog.getFileType() == BackupFileType.LOSTLOG) {
+      LOG.error("Could not find logs to make the snapshot '" + 
snap.getBackedUpFile()
+          + "' valid. Lost logs at " + 
logs.get(oldestLogNeededIndex).getRange());
+      return false;
+    }
+
+    if (newestLog.getFileType() == BackupFileType.LOSTLOG) {
+      if (zxidToRestore == Long.MAX_VALUE && oldestLogNeededIndex != 
mostRecentLogNeededIndex) {
+        // When restoring to the latest, we can use the last valid log prior 
to lost log
+        // range.
+        mostRecentLogNeededIndex++;
+      } else {
+        LOG.error("Could not find logs to make the snapshot '" + 
snap.getBackedUpFile()
+            + "' valid. Lost logs at " + 
logs.get(mostRecentLogNeededIndex).getRange() + ".");
+        return false;
+      }
+    }
+
+    Range<Long> fullRange = oldestLog.getRange().span(newestLog.getRange());
+
+    if (fullRange.lowerEndpoint() > snap.getRange().lowerEndpoint()) {
+      LOG.error("Could not find logs to make snap '" + snap.getBackedUpFile()
+          + "' valid. Logs start at zxid " + 
ZxidUtils.zxidToString(fullRange.lowerEndpoint())
+          + ".");
+      return false;
+    }
+
+    if (fullRange.upperEndpoint() < snap.getRange().upperEndpoint()) {
+      LOG.error("Could not find logs to make snap '" + snap.getBackedUpFile()
+          + "' valid. Logs end at zxid " + 
ZxidUtils.zxidToString(fullRange.upperEndpoint()) + ".");
+      return false;
+    }
+
+    if (zxidToRestore != Long.MAX_VALUE && fullRange.upperEndpoint() < 
zxidToRestore) {
+      LOG.error("Could not find logs to restore to zxid " + zxidToRestore + ". 
Logs end at zxid "
+          + ZxidUtils.zxidToString(fullRange.upperEndpoint()) + ".");
+      return false;
+    }
+
+    return true;
+  }
+
+  /**
+   * Check if the specified snap dir and data dir already have files inside.
+   * If so, ask user to confirm if they want to overwrite these two 
directories with restored files,
+   * which means to wipe out all existing files and the directories be 
populated with restored files.
+   */
+  private void checkSnapDataDirFileExistence() {
+    File dataDir = snapLog.getDataDir();
+    File snapDir = snapLog.getSnapDir();
+    if (!dataDir.exists() && !dataDir.mkdirs()) {
+      throw new BackupException("Failed to create a data directory at path: " 
+ dataDir.getPath()
+          + " to store restored txn logs.");
+    }
+    if (!snapDir.exists() && !snapDir.mkdirs()) {
+      throw new BackupException("Failed to create a snap directory at path: " 
+ snapDir.getPath()
+          + " to store restored snapshot files.");
+    }
+    String[] dataDirFiles = dataDir.list();
+    String[] snapDirFiles = snapDir.list();
+    if (Objects.requireNonNull(dataDirFiles).length > 0
+        || Objects.requireNonNull(snapDirFiles).length > 0) {
+      if (overwrite) {
+        LOG.warn(
+            "Overwriting the destination directories for restoration, deleting 
all existing files. "
+                + "The files under dataDir: " + dataDir.getPath() + " are: " + 
Arrays
+                .toString(dataDirFiles) + "; and files under snapDir: " + 
snapDir.getPath()
+                + " are: " + Arrays.toString(snapDirFiles) + ".");
+        
Arrays.stream(Objects.requireNonNull(dataDir.listFiles())).forEach(File::delete);
+        
Arrays.stream(Objects.requireNonNull(snapDir.listFiles())).forEach(File::delete);
+      } else {
+        throw new BackupException(
+            "The destination directories are not empty, user chose not to 
overwrite the entire directory, "
+                + "exiting restoration. Please check the destination directory 
dataDir path: "
+                + dataDir.getPath() + ", and snapDir path" + 
snapDir.getPath());
+      }
+    }
+  }
+
+  /**
+   * Copy selected backup files from backup storage to a local restore 
temporary directory for further processing later
+   * @param restoreTempSnapLog A FileTxnSnapLog instance created on the 
specified local temporary directory path
+   * @throws IOException
+   */
+  private void copyBackupFilesToLocalTempDir(FileTxnSnapLog 
restoreTempSnapLog) throws IOException {
+    // Copy backup files to local temp directory
+    for (BackupFileInfo backedupFile : filesToCopy) {
+      String standardFilename = backedupFile.getStandardFile().getName();
+      // Does not matter if we use dataDir or logDir since we make the paths 
of these two directories
+      // same in restoreTempSnapLog object for restoration
+      File localTempDest = new File(restoreTempSnapLog.getDataDir(), 
standardFilename);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a 
file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621002&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621002&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621002&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621002&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621002&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/BackupStorageUtil.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.backup.storage;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.persistence.Util;
+
+/**
+ * Util methods for backup storage
+ */
+public class BackupStorageUtil {
+  public static final String TMP_FILE_PREFIX = "TMP_";
+  private static final File[] NO_FILE = new File[0];
+
+  /**
+   * Parse the prefix from a file name, also works for temporary file names in 
backup storage
+   * @param fileName The file name to be parsed
+   * @return "log" for ZK transaction log files or "snapshot" for ZK snapshots
+   */
+  public static String getFileTypePrefix(String fileName) {
+    String backupFileName = fileName;
+
+    //Remove the temporary file name prefix in order to determine file type
+    if (fileName.startsWith(TMP_FILE_PREFIX)) {
+      backupFileName = fileName.substring(TMP_FILE_PREFIX.length());
+    }
+
+    String fileTypePrefix;
+    if (backupFileName.startsWith(Util.SNAP_PREFIX)) {
+      fileTypePrefix = Util.SNAP_PREFIX;
+    } else if (backupFileName.startsWith(Util.TXLOG_PREFIX)) {
+      fileTypePrefix = Util.TXLOG_PREFIX;
+    } else {
+      throw new BackupException("No matching base file type found for file " + 
fileName);
+    }
+
+    return fileTypePrefix;
+  }
+
+  /**
+   * Construct the path of a backup file in the backup storage
+   * @param fileName The name of the file
+   * @param parentDir The path to the parent directory of the backup file.
+   * @return The path of the backup file in the format of:
+   * 1. parentDir path is not supplied: {fileName} or {fileName}
+   * 2. parentDir path is provided: {parentDir}/{fileName} or 
{parentDir}/{fileName}
+   */
+  public static String constructBackupFilePath(String fileName, String 
parentDir) {
+    //TODO: store snapshots and Txlogs in different subfolders for better 
organization
+    if (parentDir != null) {
+      return String.valueOf(Paths.get(parentDir, fileName));

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621008&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621008&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621008&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621008&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621008&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/BackupStorageUtil.java:
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.zookeeper.server.backup.storage;
+
+import java.io.File;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Comparator;
+import java.util.stream.Stream;
+
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.persistence.Util;
+
+/**
+ * Util methods for backup storage
+ */
+public class BackupStorageUtil {
+  public static final String TMP_FILE_PREFIX = "TMP_";
+  private static final File[] NO_FILE = new File[0];
+
+  /**
+   * Parse the prefix from a file name, also works for temporary file names in 
backup storage
+   * @param fileName The file name to be parsed
+   * @return "log" for ZK transaction log files or "snapshot" for ZK snapshots
+   */
+  public static String getFileTypePrefix(String fileName) {
+    String backupFileName = fileName;
+
+    //Remove the temporary file name prefix in order to determine file type
+    if (fileName.startsWith(TMP_FILE_PREFIX)) {
+      backupFileName = fileName.substring(TMP_FILE_PREFIX.length());
+    }
+
+    String fileTypePrefix;
+    if (backupFileName.startsWith(Util.SNAP_PREFIX)) {
+      fileTypePrefix = Util.SNAP_PREFIX;
+    } else if (backupFileName.startsWith(Util.TXLOG_PREFIX)) {
+      fileTypePrefix = Util.TXLOG_PREFIX;
+    } else {
+      throw new BackupException("No matching base file type found for file " + 
fileName);
+    }
+
+    return fileTypePrefix;
+  }
+
+  /**
+   * Construct the path of a backup file in the backup storage
+   * @param fileName The name of the file
+   * @param parentDir The path to the parent directory of the backup file.
+   * @return The path of the backup file in the format of:
+   * 1. parentDir path is not supplied: {fileName} or {fileName}
+   * 2. parentDir path is provided: {parentDir}/{fileName} or 
{parentDir}/{fileName}
+   */
+  public static String constructBackupFilePath(String fileName, String 
parentDir) {
+    //TODO: store snapshots and Txlogs in different subfolders for better 
organization
+    if (parentDir != null) {
+      return String.valueOf(Paths.get(parentDir, fileName));
+    }
+    return fileName;
+  }
+
+  /**
+   * Construct temporary file name using backup file name
+   * @param fileName A backup file name: log.lowzxid-highzxid, 
snapshot.lowzxid-highzxid
+   * @return A temporary backup file name: TMP_log.lowzxid-highzxid, 
TMP_snapshot.lowzxid-highzxid
+   */
+  public static String constructTempFileName(String fileName) {
+    return TMP_FILE_PREFIX + fileName;
+  }
+
+  /**
+   * A basic method for streaming data from an input stream to an output stream
+   * @param inputStream The stream to read from
+   * @param outputStream The stream to write to
+   * @throws IOException
+   */
+  public static void streamData(InputStream inputStream, OutputStream 
outputStream)
+      throws IOException {
+    byte[] buffer = new byte[1024];
+    int lengthRead;
+    while ((lengthRead = inputStream.read(buffer)) > 0) {
+      outputStream.write(buffer, 0, lengthRead);
+      outputStream.flush();
+    }
+  }
+
+  /**
+   * Create a new file in a specified path, create the parent directories if 
they do not exist.
+   * @param file The path to create the file.
+   * @param overwriteIfExist If a file already exists in the location,
+   *                         1. true: delete the existing file and retry the 
creation of the new file,
+   *                         or 2. false: keep the existing file.
+   * @throws IOException
+   */
+  public static void createFile(File file, boolean overwriteIfExist) throws 
IOException {
+    file.getParentFile().mkdirs();
+    if (!file.getParentFile().exists()) {
+      throw new BackupException("Failed to create parent directories for file 
" + file.getName());
+    }
+
+    boolean retry = true;
+    while (retry) {
+      retry = overwriteIfExist;
+      if (!file.createNewFile()) {
+        if (file.exists()) {
+          if (retry && !file.delete()) {
+            throw new BackupException("A file with the file path " + 
file.getPath()
+                + " already exists, and failed to be overwritten.");
+          }
+        } else {
+          throw new BackupException("Failed to create a file at path: " + 
file.getPath());
+        }
+      }
+      retry = false;
+    }
+  }
+
+  /**
+   * Get a list of all files whose file name starts with a certain prefix 
under a directory
+   * @param directory The directory to search for the files
+   * @param prefix The prefix of file name
+   * @return
+   */
+  public static File[] getFilesWithPrefix(File directory, String prefix) {
+    if (directory == null) {
+      return NO_FILE;
+    }
+    FilenameFilter fileFilter = (dir, name) -> name.startsWith(prefix);
+    File[] files = directory.listFiles(fileFilter);
+    return files == null ? NO_FILE : files;
+  }
+
+  /**
+   * Delete all the files whose file names starts with temporary file name 
prefix
+   * @param directory The directory to search for temporary files
+   * @throws IOException
+   */
+  public static void cleanUpTempFiles(File directory) throws IOException {
+    File[] tempFiles = getFilesWithPrefix(directory, TMP_FILE_PREFIX);
+    for (File tempFile : tempFiles) {
+      Files.delete(Paths.get(tempFile.getPath()));
+    }
+  }
+
+  /**
+   * Delete a directory and all files inside it
+   * @param directory The path to the directory
+   * @throws IOException
+   */
+  public static void deleteDirectoryRecursively(File directory) throws 
IOException {
+    Stream<Path> files = Files.walk(Paths.get(directory.getPath()));

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621027&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621027&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621027&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621027&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621027&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      throw new BackupException(
+          "Backup directory " + filePath + " does not exist, could not get 
directory list.");
+    }
+
+    // Filter out all the files which are directories
+    FilenameFilter fileFilter = (dir, name) -> new File(dir, 
name).isDirectory();
+    File[] dirs = backupDir.listFiles(fileFilter);
+
+    if (dirs == null) {
+      return new ArrayList<>();
+    }
+    return Arrays.asList(dirs);
+  }
+
+  @Override
+  public InputStream open(File path) throws IOException {
+    if (!path.exists() || path.isDirectory()) {
+      throw new BackupException("The file with the file path " + path
+          + " does not exist or is a directory, could not open the file.");
+    }
+    return new FileInputStream(path);
+  }
+
+  @Override
+  public void copyToBackupStorage(File srcFile, File destName) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+    String backupTempFilePath;
+    File backupTempFile;
+
+    sharedLock.lock();
+    try {
+      inputStream = open(srcFile);
+
+      backupTempFilePath = BackupStorageUtil
+          
.constructBackupFilePath(BackupStorageUtil.constructTempFileName(destName.getName()),
+              fileRootPath);
+      backupTempFile = new File(backupTempFilePath);
+
+      BackupStorageUtil.createFile(backupTempFile, true);
+      outputStream = new FileOutputStream(backupTempFile);
+
+      BackupStorageUtil.streamData(inputStream, outputStream);
+
+      Files.move(Paths.get(backupTempFilePath),
+          
Paths.get(BackupStorageUtil.constructBackupFilePath(destName.getName(), 
fileRootPath)),

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226620992&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620992&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620992&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226620992&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226620992&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      throw new BackupException(
+          "Backup directory " + filePath + " does not exist, could not get 
directory list.");
+    }
+
+    // Filter out all the files which are directories
+    FilenameFilter fileFilter = (dir, name) -> new File(dir, 
name).isDirectory();

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/io/File;Ljava/lang/String;)V) reads a 
file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621000&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621000&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621000&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621000&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621000&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/RestoreFromBackupTool.java:
##########
@@ -0,0 +1,716 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.InvocationTargetException;
+import java.nio.file.Files;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+import org.apache.commons.cli.CommandLine;
+import org.apache.zookeeper.ZooKeeper;
+import org.apache.zookeeper.cli.RestoreCommand;
+import org.apache.zookeeper.common.ConfigException;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.backup.timetable.TimetableUtil;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *  This is a tool to restore, from backup storage, a snapshot and set of 
transaction log files
+ *  that combine to represent a transactionally consistent image of a 
ZooKeeper ensemble at
+ *  some zxid.
+ */
+public class RestoreFromBackupTool {
+  private static final Logger LOG = 
LoggerFactory.getLogger(RestoreFromBackupTool.class);
+
+  private static final int MAX_RETRIES = 10;
+  private static final String HEX_PREFIX = "0x";
+  private static final int CONNECTION_TIMEOUT = 300000;
+
+  BackupStorageProvider storage;
+  FileTxnSnapLog snapLog;
+  long zxidToRestore;
+  boolean dryRun;
+  File restoreTempDir;
+  boolean overwrite = false;
+
+  // Spot restoration
+  boolean isSpotRestoration = false;
+  String znodePathToRestore;
+  String zkServerConnectionStr;
+  boolean restoreRecursively = false;
+  ZooKeeper zk;
+  SpotRestorationTool spotRestorationTool;
+
+  List<BackupFileInfo> logs;
+  List<BackupFileInfo> snaps;
+  List<BackupFileInfo> filesToCopy;
+
+  int mostRecentLogNeededIndex;
+  int snapNeededIndex;
+  int oldestLogNeededIndex;
+
+  public enum BackupStorageOption {
+    
GPFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage"),
+    
NFS("org.apache.zookeeper.server.backup.storage.impl.FileSystemBackupStorage");
+
+    private String storageProviderClassName;
+
+    BackupStorageOption(String className) {
+      this.storageProviderClassName = className;
+    }
+
+    public String getStorageProviderClassName() {
+      return storageProviderClassName;
+    }
+  }
+
+  /**
+   * Default constructor; requires using parseArgs to setup state.
+   */
+  public RestoreFromBackupTool() {
+    this(null, null, -1L, false, null);
+  }
+
+  /**
+   * Constructor
+   * @param storageProvider the backup provider from which to restore the 
backups
+   * @param snapLog the snap and log provider to use
+   * @param zxidToRestore the zxid upto which to restore, or Long.MAX to 
restore to the latest
+   *                      available transactionally consistent zxid.
+   * @param dryRun whether this is a dryrun in which case no files are 
actually copied
+   * @param restoreTempDir a temporary, local (not remote) directory to stage 
the backup files from
+   *                       remote backup storage for the processing stage of 
restoration. Note that
+   *                       this directory and the files in it will be removed 
after restoration.
+   */
+  RestoreFromBackupTool(BackupStorageProvider storageProvider, FileTxnSnapLog 
snapLog,
+      long zxidToRestore, boolean dryRun, File restoreTempDir) {
+
+    filesToCopy = new ArrayList<>();
+    snapNeededIndex = -1;
+    mostRecentLogNeededIndex = -1;
+    oldestLogNeededIndex = -1;
+
+    this.storage = storageProvider;
+    this.zxidToRestore = zxidToRestore;
+    this.snapLog = snapLog;
+    this.dryRun = dryRun;
+    this.restoreTempDir = restoreTempDir;
+  }
+
+  /**
+   * Parse and validate arguments to the tool
+   * @param cl the command line object with user inputs
+   * @return true if the arguments parse correctly; false in all other cases.
+   * @throws IOException if the backup provider cannot be instantiated 
correctly.
+   */
+  public void parseArgs(CommandLine cl) {
+    String backupStoragePath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.BACKUP_STORE);
+    createBackupStorageProvider(backupStoragePath);
+
+    // Read the restore point
+    if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_ZXID)) {
+      parseRestoreZxid(cl);
+    } else if (cl.hasOption(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP)) 
{
+      parseRestoreTimestamp(cl, backupStoragePath);
+    }
+
+    parseAndValidateSpotRestorationArgs(cl);
+
+    parseAndValidateOfflineRestoreDestination(cl);
+
+    parseRestoreTempDir(cl);
+
+    // Check if overwriting the destination directories is allowed
+    if (cl.hasOption(RestoreCommand.OptionShortForm.OVERWRITE)) {
+      overwrite = true;
+    }
+
+    // Check if this is a dry-run
+    if (cl.hasOption(RestoreCommand.OptionShortForm.DRY_RUN)) {
+      dryRun = true;
+    }
+
+    System.out.println("parseArgs successful.");
+  }
+
+  private void createBackupStorageProvider(String backupStoragePath) {
+    String[] backupStorageParams = backupStoragePath.split(":");
+    if (backupStorageParams.length != 4) {
+      System.err.println(
+          "Failed to parse backup storage connection information from the 
backup storage path provided, please check the input.");
+      System.err.println(
+          "For example: the format for a gpfs backup storage path should be 
\"gpfs:<config_path>:<backup_path>:<namespace>\".");
+      System.exit(1);
+    }
+
+    String userProvidedStorageName = backupStorageParams[0].toUpperCase();
+    try {
+      BackupStorageOption storageOption = 
BackupStorageOption.valueOf(userProvidedStorageName);
+      String backupStorageProviderClassName = 
storageOption.getStorageProviderClassName();
+
+      BackupConfig.RestorationConfigBuilder configBuilder =
+          new BackupConfig.RestorationConfigBuilder()
+              .setStorageProviderClassName(backupStorageProviderClassName)
+              
.setBackupStoragePath(backupStorageParams[2]).setNamespace(backupStorageParams[3]);
+      if (!backupStorageParams[1].isEmpty()) {
+        configBuilder = configBuilder.setStorageConfig(new 
File(backupStorageParams[1]));
+      }
+      storage = 
BackupUtil.createStorageProviderImpl(configBuilder.build().get());
+    } catch (IllegalArgumentException e) {
+      System.err.println("Could not find a valid backup storage option based 
on the input: "
+          + userProvidedStorageName + ". Error message: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    } catch (ConfigException e) {
+      System.err.println(
+          "Could not generate a backup config based on the input, error 
message: " + e
+              .getMessage());
+      e.getStackTrace();
+      System.exit(1);
+    } catch (InstantiationException | InvocationTargetException | 
NoSuchMethodException | IllegalAccessException | ClassNotFoundException e) {
+      System.err.println(
+          "Could not generate a backup storage provider based on the input, 
error message: " + e
+              .getMessage());
+      e.printStackTrace();
+      System.exit(1);
+    }
+  }
+
+  private void parseRestoreZxid(CommandLine cl) {
+    String zxidToRestoreStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_ZXID);
+    if (zxidToRestoreStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+      zxidToRestore = Long.MAX_VALUE;
+    } else {
+      int base = 10;
+      String numStr = zxidToRestoreStr;
+
+      if (zxidToRestoreStr.startsWith(HEX_PREFIX)) {
+        numStr = zxidToRestoreStr.substring(2);
+        base = 16;
+      }
+      try {
+        zxidToRestore = Long.parseLong(numStr, base);
+      } catch (NumberFormatException nfe) {
+        System.err
+            .println("Invalid number specified for restore zxid point, the 
input is: " + numStr);
+        System.exit(2);
+      }
+    }
+  }
+
+  private void parseRestoreTimestamp(CommandLine cl, String backupStoragePath) 
{
+    String timestampStr = 
cl.getOptionValue(RestoreCommand.OptionShortForm.RESTORE_TIMESTAMP);
+    String timetableStoragePath = backupStoragePath;
+    if (cl.hasOption(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH)) {
+      timetableStoragePath =
+          
cl.getOptionValue(RestoreCommand.OptionShortForm.TIMETABLE_STORAGE_PATH);
+    }
+    File[] timetableFiles = new File(timetableStoragePath)
+        .listFiles(file -> 
file.getName().startsWith(TimetableBackup.TIMETABLE_PREFIX));
+    if (timetableFiles == null || timetableFiles.length == 0) {
+      System.err.println("Could not find timetable files at the path: " + 
timetableStoragePath);
+      System.exit(2);
+    }
+    Map.Entry<Long, String> restorePoint;
+    String message;
+    try {
+      restorePoint = TimetableUtil.findLastZxidFromTimestamp(timetableFiles, 
timestampStr);
+      zxidToRestore = Long.parseLong(restorePoint.getValue(), 16);
+      String timeToRestore = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss")
+          .format(new java.util.Date(restorePoint.getKey()));
+      if (timestampStr.equalsIgnoreCase(BackupUtil.LATEST)) {
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to latest.";
+      } else {
+        String requestedTimeToRestore = new SimpleDateFormat("MM/dd/yyyy 
HH:mm:ss")
+            .format(new java.util.Date(Long.decode(timestampStr)));
+        message = "Restoring to " + timeToRestore + ", original request was to 
restore to "
+            + requestedTimeToRestore + ".";
+      }
+      System.out.println(message);
+      LOG.info(message);
+    } catch (IllegalArgumentException | BackupException e) {
+      System.err.println(
+          "Could not find a valid zxid from timetable using the timestamp 
provided: " + timestampStr
+              + ". The error message is: " + e.getMessage());
+      e.printStackTrace();
+      System.exit(2);
+    }
+  }
+
+  private void parseAndValidateOfflineRestoreDestination(CommandLine cl) {
+    if (isSpotRestoration) {
+      return;
+    }
+    // Read restore destination: dataDir and logDir
+    try {
+      String snapDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.SNAP_DESTINATION);
+      String logDirPath = 
cl.getOptionValue(RestoreCommand.OptionShortForm.LOG_DESTINATION);
+
+      if (snapDirPath == null || logDirPath == null) {
+        throw new BackupException(
+            "Snap destination path and log destination path are not defined 
for offline restoration. SnapDirPath: "
+                + snapDirPath + ", logDirPath: " + logDirPath);
+      }
+
+      File snapDir = new File(snapDirPath);
+      File logDir = new File(logDirPath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621006&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621006&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621006&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621006&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621006&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      throw new BackupException(
+          "Backup directory " + filePath + " does not exist, could not get 
directory list.");
+    }
+
+    // Filter out all the files which are directories
+    FilenameFilter fileFilter = (dir, name) -> new File(dir, 
name).isDirectory();
+    File[] dirs = backupDir.listFiles(fileFilter);
+
+    if (dirs == null) {
+      return new ArrayList<>();
+    }
+    return Arrays.asList(dirs);
+  }
+
+  @Override
+  public InputStream open(File path) throws IOException {
+    if (!path.exists() || path.isDirectory()) {
+      throw new BackupException("The file with the file path " + path
+          + " does not exist or is a directory, could not open the file.");
+    }
+    return new FileInputStream(path);
+  }
+
+  @Override
+  public void copyToBackupStorage(File srcFile, File destName) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+    String backupTempFilePath;
+    File backupTempFile;
+
+    sharedLock.lock();
+    try {
+      inputStream = open(srcFile);
+
+      backupTempFilePath = BackupStorageUtil
+          
.constructBackupFilePath(BackupStorageUtil.constructTempFileName(destName.getName()),
+              fileRootPath);
+      backupTempFile = new File(backupTempFilePath);
+
+      BackupStorageUtil.createFile(backupTempFile, true);
+      outputStream = new FileOutputStream(backupTempFile);
+
+      BackupStorageUtil.streamData(inputStream, outputStream);
+
+      Files.move(Paths.get(backupTempFilePath),
+          
Paths.get(BackupStorageUtil.constructBackupFilePath(destName.getName(), 
fileRootPath)),
+          StandardCopyOption.REPLACE_EXISTING);
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+      if (outputStream != null) {
+        outputStream.close();
+      }
+      sharedLock.unlock();
+    }
+  }
+
+  @Override
+  public void copyToLocalStorage(File srcName, File destFile) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+
+    // Create input stream from the source file in backup storage
+    String backupFilePath =
+        BackupStorageUtil.constructBackupFilePath(srcName.getName(), 
fileRootPath);
+    File backupFile = new File(backupFilePath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621018&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621018&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621018&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621018&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621018&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API 
(java/nio/file/Paths.get(Ljava/lang/String;[Ljava/lang/String;)Ljava/nio/file/Path;)
 reads a file whose location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621042&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621042&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621042&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621042&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621042&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);

Review Comment:
   
*[PATH_TRAVERSAL_IN](https://find-sec-bugs.github.io/bugs.htm#PATH_TRAVERSAL_IN):*
  This API (java/io/File.<init>(Ljava/lang/String;)V) reads a file whose 
location might be specified by user input
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621035&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621035&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621035&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621035&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621035&lift_comment_rating=5)
 ]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to