sonatype-lift[bot] commented on code in PR #1883:
URL: https://github.com/apache/zookeeper/pull/1883#discussion_r878546062


##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/storage/impl/FileSystemBackupStorage.java:
##########
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.storage.impl;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.FilenameFilter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.nio.file.StandardCopyOption;
+import java.nio.file.attribute.BasicFileAttributes;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import org.apache.zookeeper.server.backup.BackupConfig;
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.storage.BackupStorageUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation for backup storage provider for file systems that store 
files in a tree (hierarchical) structure
+ * To use this class for different file systems, use appropriate address for 
backupStoragePath in BackupConfig
+ * For example:
+ * 1. hard disk drive & solid-state drive: /mountPoint/relativePathToMountPoint
+ * 2. NFS: /nfsClientMountPoint/relativePathToMountPoint
+ * 3. local disk: an absolute path to a directory
+ */
+public class FileSystemBackupStorage implements BackupStorageProvider {
+  private static final Logger LOG = 
LoggerFactory.getLogger(FileSystemBackupStorage.class);
+  private final String fileRootPath;
+  private final ReadWriteLock rwLock;
+  private final Lock sharedLock;
+  private final Lock exclusiveLock;
+
+  /**
+   * Constructor using BackupConfig to get backup storage info
+   * @param backupConfig The information and settings about backup storage, to 
be set as a part of ZooKeeper server config
+   */
+  public FileSystemBackupStorage(BackupConfig backupConfig) {
+    if (!new File(backupConfig.getBackupStoragePath()).exists()) {
+      throw new BackupException(
+          "The backup storage is not ready, please check the path: " + 
backupConfig
+              .getBackupStoragePath());
+    }
+    fileRootPath = String
+        .join(File.separator, backupConfig.getBackupStoragePath(), 
backupConfig.getNamespace());
+    rwLock = new ReentrantReadWriteLock();
+    sharedLock = rwLock.readLock();
+    exclusiveLock = rwLock.writeLock();
+  }
+
+  @Override
+  public BackupFileInfo getBackupFileInfo(File file) throws IOException {
+    String backupFilePath = 
BackupStorageUtil.constructBackupFilePath(file.getName(), fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    if (!backupFile.exists()) {
+      return new BackupFileInfo(backupFile, BackupFileInfo.NOT_SET, 
BackupFileInfo.NOT_SET);
+    }
+
+    BasicFileAttributes fileAttributes =
+        Files.readAttributes(Paths.get(backupFilePath), 
BasicFileAttributes.class);
+    return new BackupFileInfo(backupFile, 
fileAttributes.lastModifiedTime().toMillis(),
+        fileAttributes.size());
+  }
+
+  @Override
+  public List<BackupFileInfo> getBackupFileInfos(File path, String prefix) 
throws IOException {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = Paths.get(fileRootPath, filePath).toString();
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      return new ArrayList<>();
+    }
+
+    File[] files = BackupStorageUtil.getFilesWithPrefix(backupDir, prefix);
+
+    // Read the file info and add to the list. If an exception is thrown, the 
entire operation will fail
+    List<BackupFileInfo> backupFileInfos = new ArrayList<>();
+    for (File file : files) {
+      backupFileInfos.add(getBackupFileInfo(file));
+    }
+    return backupFileInfos;
+  }
+
+  @Override
+  public List<File> getDirectories(File path) {
+    String filePath = path == null ? "" : path.getPath();
+    String backupDirPath = BackupStorageUtil.constructBackupFilePath(filePath, 
fileRootPath);
+    File backupDir = new File(backupDirPath);
+
+    if (!backupDir.exists()) {
+      throw new BackupException(
+          "Backup directory " + filePath + " does not exist, could not get 
directory list.");
+    }
+
+    // Filter out all the files which are directories
+    FilenameFilter fileFilter = (dir, name) -> new File(dir, 
name).isDirectory();
+    File[] dirs = backupDir.listFiles(fileFilter);
+
+    if (dirs == null) {
+      return new ArrayList<>();
+    }
+    return Arrays.asList(dirs);
+  }
+
+  @Override
+  public InputStream open(File path) throws IOException {
+    if (!path.exists() || path.isDirectory()) {
+      throw new BackupException("The file with the file path " + path
+          + " does not exist or is a directory, could not open the file.");
+    }
+    return new FileInputStream(path);
+  }
+
+  @Override
+  public void copyToBackupStorage(File srcFile, File destName) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+    String backupTempFilePath;
+    File backupTempFile;
+
+    sharedLock.lock();
+    try {
+      inputStream = open(srcFile);
+
+      backupTempFilePath = BackupStorageUtil
+          
.constructBackupFilePath(BackupStorageUtil.constructTempFileName(destName.getName()),
+              fileRootPath);
+      backupTempFile = new File(backupTempFilePath);
+
+      BackupStorageUtil.createFile(backupTempFile, true);
+      outputStream = new FileOutputStream(backupTempFile);
+
+      BackupStorageUtil.streamData(inputStream, outputStream);
+
+      Files.move(Paths.get(backupTempFilePath),
+          
Paths.get(BackupStorageUtil.constructBackupFilePath(destName.getName(), 
fileRootPath)),
+          StandardCopyOption.REPLACE_EXISTING);
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+      if (outputStream != null) {
+        outputStream.close();
+      }
+      sharedLock.unlock();
+    }
+  }
+
+  @Override
+  public void copyToLocalStorage(File srcName, File destFile) throws 
IOException {
+    InputStream inputStream = null;
+    OutputStream outputStream = null;
+
+    // Create input stream from the source file in backup storage
+    String backupFilePath =
+        BackupStorageUtil.constructBackupFilePath(srcName.getName(), 
fileRootPath);
+    File backupFile = new File(backupFilePath);
+
+    try {
+      inputStream = open(backupFile);
+
+      BackupStorageUtil.createFile(destFile, true);
+      outputStream = new FileOutputStream(destFile);
+
+      BackupStorageUtil.streamData(inputStream, outputStream);
+    } finally {
+      if (inputStream != null) {
+        inputStream.close();
+      }
+      if (outputStream != null) {
+        outputStream.close();
+      }
+    }
+  }

Review Comment:
   *RESOURCE_LEAK:*  resource of type `java.io.FileOutputStream` acquired by 
call to `FileOutputStream(...)` at line 193 is not released after line 204.
   **Note**: potential exception at line 198
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621613&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621613&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621613&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621613&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621613&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/timetable/TimetableBackup.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.timetable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.BackupManager;
+import org.apache.zookeeper.server.backup.BackupPoint;
+import org.apache.zookeeper.server.backup.BackupProcess;
+import org.apache.zookeeper.server.backup.BackupStatus;
+import org.apache.zookeeper.server.backup.BackupUtil;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements Timetable-specific logic for BackupProcess.
+ *
+ * Backup timetable encodes data of the format <timestamp>:<zxid>. This is 
used to locate the
+ * closest zxid backup point given the timestamp. The use case is for users 
who wish to restore
+ * from backup at a specific time recorded in the backup timetable.
+ */
+public class TimetableBackup extends BackupProcess {
+  public static final String TIMETABLE_PREFIX = "timetable";
+  // Use an ordered map of <timestamp>:<zxid>
+  private final TreeMap<Long, String> timetableRecordMap = new TreeMap<>();
+  private final File tmpDir;
+  // Lock is used to keep access to timetableRecordMap exclusive
+  private final Lock lock = new ReentrantLock(true);
+
+  // Candidate files to be backed up each iteration sorted by name (using 
SortedSet)
+  private final SortedSet<BackupManager.BackupFile> 
candidateTimetableBackupFiles =
+      new TreeSet<>(Comparator.comparing(o -> o.getFile().getName()));
+  // BackupStatus is used here to keep track of timetable backup status in 
case of a crash/restart
+  // BackupStatus is written to a file. After a crash (e.g. JVM crash) or a 
restart, the
+  // locally-stored zkBackupStatus file will be read back to restore a 
BackupPoint
+  private final BackupStatus backupStatus;
+  private final BackupPoint backupPoint;
+  private final TimetableBackupStats backupStats; // Metrics
+
+  /**
+   * Create an instance of TimetableBackup.
+   * @param snapLog
+   * @param tmpDir
+   * @param backupStorageProvider
+   * @param backupIntervalInMilliseconds
+   * @param timetableBackupIntervalInMs
+   * @param backupStatus
+   */
+  public TimetableBackup(FileTxnSnapLog snapLog, File tmpDir,
+      BackupStorageProvider backupStorageProvider, long 
backupIntervalInMilliseconds,
+      long timetableBackupIntervalInMs, BackupStatus backupStatus, BackupPoint 
backupPoint,
+      TimetableBackupStats backupStats) {
+    super(LoggerFactory.getLogger(TimetableBackup.class), 
backupStorageProvider,
+        backupIntervalInMilliseconds);
+    this.tmpDir = tmpDir;
+    this.backupStatus = backupStatus;
+    this.backupPoint = backupPoint;
+    this.backupStats = backupStats;
+    // Start creating records
+    (new Thread(new TimetableRecorder(snapLog, 
timetableBackupIntervalInMs))).start();
+    logger.info("TimetableBackup::Starting TimetableBackup Process with backup 
interval: "
+        + backupIntervalInMilliseconds + " ms and timetable backup interval: "
+        + timetableBackupIntervalInMs + " ms.");
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    // Get the latest timetable backup file from backup storage
+    BackupFileInfo latest = BackupUtil.getLatest(backupStorage, 
BackupUtil.BackupFileType.TIMETABLE,
+        BackupUtil.IntervalEndpoint.END);
+
+    long latestTimestampBackedUp = latest == null ? 
BackupUtil.INVALID_TIMESTAMP
+        : latest.getIntervalEndpoint(BackupUtil.IntervalEndpoint.END);
+
+    logger.info(
+        "TimetableBackup::initialize(): latest timestamp from storage: {}, 
from BackupStatus: {}",
+        latestTimestampBackedUp, backupPoint.getTimestamp());
+
+    if (latestTimestampBackedUp != backupPoint.getTimestamp()) {
+      synchronized (backupStatus) {
+        backupPoint.setTimestamp(latestTimestampBackedUp);
+        backupStatus.update(backupPoint);
+      }
+    }
+  }
+
+  @Override
+  protected void startIteration() throws IOException {
+    backupStats.setTimetableBackupIterationStart();
+
+    // It's possible that the last iteration failed and left some tmp backup 
files behind - add
+    // them back to candidate backup files so the I/O write will happen again
+    getAllTmpBackupFiles();
+
+    // Create a timetable backup file from the cached records 
(timetableRecordMap)
+    lock.lock();
+    try {
+      if (!timetableRecordMap.isEmpty()) {
+        // Create a temp timetable backup file
+        // Make the backup file temporary so that it will be deleted after the 
iteration provided
+        // the I/O write to the backup storage succeeds. Otherwise, this 
temporary backup file will
+        // continue to exist in tmpDir locally until it's successfully written 
to the backup storage
+        BackupManager.BackupFile timetableBackupFromThisIteration =
+            new BackupManager.BackupFile(createTimetableBackupFile(), true, 0, 
0);
+        candidateTimetableBackupFiles.add(timetableBackupFromThisIteration);
+        timetableRecordMap.clear(); // Clear the map
+      }
+    } catch (Exception e) {
+      logger.error("TimetableBackup::startIteration(): failed to create 
timetable file to back up!",
+          e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  protected void endIteration(boolean errorFree) {
+    // timetableRecordMap is cleared in startIteration() already, so we do not 
clear it here
+    backupStats.setTimetableBackupIterationDone(errorFree);
+  }
+
+  @Override
+  protected BackupManager.BackupFile getNextFileToBackup() throws IOException {
+    BackupManager.BackupFile nextFile = null;
+    if (!candidateTimetableBackupFiles.isEmpty()) {

Review Comment:
   *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method 
`TimetableBackup.getNextFileToBackup()` reads without synchronization from 
container `this.candidateTimetableBackupFiles` via call to 
`SortedSet.isEmpty()`. Potentially races with write in method 
`TimetableBackup.backupComplete(...)`.
    Reporting because another access to the same memory occurs on a background 
thread, although this access may not.
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621617&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621617&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621617&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621617&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621617&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.BackupBean;
+import org.apache.zookeeper.server.backup.monitoring.BackupStats;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupBean;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.persistence.ZxidRange;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the backing up of txnlog and snap files to remote
+ * storage for longer term and durable retention than is possible on
+ * an ensemble server
+ */
+public class BackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
+
+  private final Logger logger;
+  private final File snapDir;
+  private final File dataLogDir;
+  private final File tmpDir;
+  private final BackupConfig backupConfig;
+  private final long backupIntervalInMilliseconds;
+  private BackupProcess logBackup = null;
+  private BackupProcess snapBackup = null;
+
+  // backupStatus, backupLogZxid and backedupSnapZxid need to be access while 
synchronized
+  // on backupStatus.
+  private final BackupStatus backupStatus;
+  private BackupPoint backupPoint;
+
+  private final BackupStorageProvider backupStorage;
+  private final long serverId;
+  private final String namespace;
+
+  @VisibleForTesting
+  protected BackupBean backupBean = null;
+  protected TimetableBackupBean timetableBackupBean = null;
+
+  private BackupStats backupStats = null;
+
+  // Optional timetable backup
+  private BackupProcess timetableBackup = null;
+
+  /**
+   * Tracks a file that needs to be backed up, including temporary copies of 
the file
+   */
+  public static class BackupFile {
+    private final File file;
+    private final boolean isTemporary;
+    private final ZxidRange zxidRange;
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param fileMinZxid the min zxid associated with this file
+     * @param fileMaxZxid the max zxid associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, long 
fileMinZxid, long fileMaxZxid) {
+      this(backupFile, isTemporaryFile, new ZxidRange(fileMinZxid, 
fileMaxZxid));
+    }
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param zxidRange the zxid range associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, ZxidRange 
zxidRange) {
+      Preconditions.checkNotNull(zxidRange);
+
+      if (!zxidRange.isHighPresent()) {
+        throw new IllegalArgumentException("ZxidRange must have a high value");
+      }
+
+      this.file = backupFile;
+      this.isTemporary = isTemporaryFile;
+      this.zxidRange = zxidRange;
+    }
+
+    /**
+     * Perform cleanup including deleting temporary files.
+     */
+    public void cleanup() {
+      if (isTemporary && exists()) {
+        file.delete();
+      }
+    }
+
+    /**
+     * Whether the file representing the zxids exists
+     * @return whether the file represented exists
+     */
+    public boolean exists() {
+      return file != null && file.exists();
+    }
+
+    /**
+     * Get the current file (topmost on the stack)
+     * @return the current file
+     */
+    public File getFile() { return file; }
+
+    /**
+     * Get the zxid range associated with this file
+     * @return the zxid range
+     */
+    public ZxidRange getZxidRange() {
+      return zxidRange;
+    }
+
+    /**
+     * Get the min zxid associated with this file
+     * @return the min associated zxid
+     */
+    public long getMinZxid() { return zxidRange.getLow(); }
+
+    /**
+     * Get the max zxid associated with this file
+     * @return the max associated zxid
+     */
+    public long getMaxZxid() { return zxidRange.getHigh(); }
+
+    @Override
+    public String toString() {
+      return String.format("%s : %s : %d - %d",
+          file == null ? "[empty]" : file.getPath(),
+          isTemporary ? "temp" : "perm",
+          zxidRange.getLow(),
+          zxidRange.getHigh());
+    }
+  }
+
+  /**
+   * Implements txnlog specific logic for BackupProcess
+   */
+  protected class TxnLogBackup extends BackupProcess {
+    private long iterationEndPoint;
+    private final FileTxnSnapLog snapLog;
+
+    /**
+     * Constructor for TxnLogBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public TxnLogBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(TxnLogBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.TXNLOG, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_LOG_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.END);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+
+      if (rZxid != backupPoint.getLogZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setLogZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    protected void startIteration() {
+      // Store the current last logged zxid.  This becomes the stopping point
+      // for the current iteration so we don't keep chasing our own tail as
+      // new transactions get written.
+      iterationEndPoint = snapLog.getLastLoggedZxid();
+      backupStats.setTxnLogBackupIterationStart();
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setTxnLogBackupIterationDone(errorFree);
+      iterationEndPoint = 0L;
+    }
+
+    /**
+     * Gets the next txnlog file to backup.  This is a temporary file created 
by copying
+     * all transaction from the previous backup point until the end zxid for 
this iteration, or
+     * a file indicating that some log records were lost.
+     * @return the file that needs to be backed-up.  The minZxid is the first
+     *      zxid contained in the file.  The maxZxid is the last zxid that is 
contained in the
+     *      file.
+     * @throws IOException
+     */
+    protected BackupFile getNextFileToBackup() throws IOException {
+      long startingZxid = backupStatus.read().getLogZxid() + 1;
+
+      // Don't keep chasing the tail so stop if past the last zxid at the time
+      // this iteration started.
+      if (startingZxid > iterationEndPoint) {
+        return null;
+      }
+
+      TxnLog.TxnIterator iter = null;
+      FileTxnLog newFile = null;
+      long lastZxid = -1;
+      int txnCopied = 0;
+      BackupFile ret;
+
+      logger.info("Creating backup file from zxid {}.", 
ZxidUtils.zxidToString(startingZxid));
+
+      try {
+        iter = snapLog.readTxnLog(startingZxid, true);
+
+        // Use a temp directory to avoid conflicts with live txnlog files
+        newFile = new FileTxnLog(tmpDir);
+
+        // TODO: Ideally, we should have have a way to prevent lost TxLogs
+        // Check for lost txnlogs; <=1 indicates that no backups have been 
done before so
+        // nothing can be considered lost.
+        // If a lost sequence is found then return a file whose name encodes 
the lost
+        // sequence and back that up so the backup store has a record of the 
lost sequence
+        if (startingZxid > 1 &&
+            iter.getHeader() != null &&
+            iter.getHeader().getZxid() > startingZxid) {
+
+          logger.error("TxnLog backups lost.  Required starting zxid={}  First 
available zxid={}",
+              ZxidUtils.zxidToString(startingZxid),
+              ZxidUtils.zxidToString(iter.getHeader().getZxid()));
+
+          String fileName = String.format("%s.%s",
+              BackupUtil.LOST_LOG_PREFIX,
+              Long.toHexString(startingZxid));
+          File lostZxidFile = new File(tmpDir, fileName);
+          lostZxidFile.createNewFile();
+
+          return new BackupFile(lostZxidFile, true, startingZxid, 
iter.getHeader().getZxid() - 1);
+        }
+
+        while (iter.getHeader() != null) {
+          TxnHeader hdr = iter.getHeader();
+
+          if (hdr.getZxid() > iterationEndPoint) {
+            break;
+          }
+
+          newFile.append(hdr, iter.getTxn());
+
+          // update position and count only AFTER the record has been 
successfully
+          // copied
+          lastZxid = hdr.getZxid();
+          txnCopied++;
+
+          iter.next();
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+
+        if (ret != null) {
+          logger.info("Copied {} records starting at {} and ending at zxid 
{}.",
+              txnCopied,
+              ZxidUtils.zxidToString(ret.getMinZxid()),
+              ZxidUtils.zxidToString(ret.getMaxZxid()));
+        }
+
+      } catch (IOException e) {
+        logger.warn("Hit exception after {} records.  Exception: {} ", 
txnCopied, e.fillInStackTrace());
+
+        // If any records were copied return those and ignore the error.  
Otherwise
+        // rethrow the error to be handled by the caller as a failed backup 
iteration.
+        if (txnCopied <= 0) {
+          throw e;
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+
+        if (newFile != null) {
+          newFile.close();
+        }
+      }
+
+      return ret;
+    }
+
+    private BackupFile makeBackupFileFromCopiedLog(FileTxnLog backupTxnLog, 
long lastZxid) {
+
+      if (backupTxnLog == null) {
+        return null;
+      }
+
+      // The logFile gets initialized with the first transaction's zxid
+      File logFile = backupTxnLog.getCurrentFile();
+
+      if (logFile == null) {
+        return null;
+      }
+
+      long firstZxid = Util.getZxidFromName(logFile.getName(), 
Util.TXLOG_PREFIX);
+
+      if (lastZxid == -1) {
+        lastZxid = firstZxid;
+      }
+
+      return new BackupFile(logFile, true, new ZxidRange(firstZxid, lastZxid));
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setLogZxid(file.getMaxZxid());
+        backupStatus.update(backupPoint);
+      }
+
+      logger.info("Updated backedup tnxlog zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+    }
+  }
+
+  /**
+   * Implements snapshot specific logic for BackupProcess
+   */
+  protected class SnapBackup extends BackupProcess {
+    private final FileTxnSnapLog snapLog;
+    private final List<BackupFile> filesToBackup = new ArrayList<>();
+
+    /**
+     * Constructor for SnapBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public SnapBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(SnapBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.SNAPSHOT, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_SNAP_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.START);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getSnapZxid()));

Review Comment:
   *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method 
`BackupManager$SnapBackup.initialize()` indirectly reads without 
synchronization from `this.this$0.backupPoint.snapZxid`. Potentially races with 
write in method `BackupManager$SnapBackup.backupComplete(...)`.
    Reporting because this access may occur on a background thread.
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621718&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621718&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621718&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621718&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621718&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/timetable/TimetableBackup.java:
##########
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup.timetable;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.util.Comparator;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.zookeeper.server.backup.BackupFileInfo;
+import org.apache.zookeeper.server.backup.BackupManager;
+import org.apache.zookeeper.server.backup.BackupPoint;
+import org.apache.zookeeper.server.backup.BackupProcess;
+import org.apache.zookeeper.server.backup.BackupStatus;
+import org.apache.zookeeper.server.backup.BackupUtil;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implements Timetable-specific logic for BackupProcess.
+ *
+ * Backup timetable encodes data of the format <timestamp>:<zxid>. This is 
used to locate the
+ * closest zxid backup point given the timestamp. The use case is for users 
who wish to restore
+ * from backup at a specific time recorded in the backup timetable.
+ */
+public class TimetableBackup extends BackupProcess {
+  public static final String TIMETABLE_PREFIX = "timetable";
+  // Use an ordered map of <timestamp>:<zxid>
+  private final TreeMap<Long, String> timetableRecordMap = new TreeMap<>();
+  private final File tmpDir;
+  // Lock is used to keep access to timetableRecordMap exclusive
+  private final Lock lock = new ReentrantLock(true);
+
+  // Candidate files to be backed up each iteration sorted by name (using 
SortedSet)
+  private final SortedSet<BackupManager.BackupFile> 
candidateTimetableBackupFiles =
+      new TreeSet<>(Comparator.comparing(o -> o.getFile().getName()));
+  // BackupStatus is used here to keep track of timetable backup status in 
case of a crash/restart
+  // BackupStatus is written to a file. After a crash (e.g. JVM crash) or a 
restart, the
+  // locally-stored zkBackupStatus file will be read back to restore a 
BackupPoint
+  private final BackupStatus backupStatus;
+  private final BackupPoint backupPoint;
+  private final TimetableBackupStats backupStats; // Metrics
+
+  /**
+   * Create an instance of TimetableBackup.
+   * @param snapLog
+   * @param tmpDir
+   * @param backupStorageProvider
+   * @param backupIntervalInMilliseconds
+   * @param timetableBackupIntervalInMs
+   * @param backupStatus
+   */
+  public TimetableBackup(FileTxnSnapLog snapLog, File tmpDir,
+      BackupStorageProvider backupStorageProvider, long 
backupIntervalInMilliseconds,
+      long timetableBackupIntervalInMs, BackupStatus backupStatus, BackupPoint 
backupPoint,
+      TimetableBackupStats backupStats) {
+    super(LoggerFactory.getLogger(TimetableBackup.class), 
backupStorageProvider,
+        backupIntervalInMilliseconds);
+    this.tmpDir = tmpDir;
+    this.backupStatus = backupStatus;
+    this.backupPoint = backupPoint;
+    this.backupStats = backupStats;
+    // Start creating records
+    (new Thread(new TimetableRecorder(snapLog, 
timetableBackupIntervalInMs))).start();
+    logger.info("TimetableBackup::Starting TimetableBackup Process with backup 
interval: "
+        + backupIntervalInMilliseconds + " ms and timetable backup interval: "
+        + timetableBackupIntervalInMs + " ms.");
+  }
+
+  @Override
+  protected void initialize() throws IOException {
+    // Get the latest timetable backup file from backup storage
+    BackupFileInfo latest = BackupUtil.getLatest(backupStorage, 
BackupUtil.BackupFileType.TIMETABLE,
+        BackupUtil.IntervalEndpoint.END);
+
+    long latestTimestampBackedUp = latest == null ? 
BackupUtil.INVALID_TIMESTAMP
+        : latest.getIntervalEndpoint(BackupUtil.IntervalEndpoint.END);
+
+    logger.info(
+        "TimetableBackup::initialize(): latest timestamp from storage: {}, 
from BackupStatus: {}",
+        latestTimestampBackedUp, backupPoint.getTimestamp());
+
+    if (latestTimestampBackedUp != backupPoint.getTimestamp()) {
+      synchronized (backupStatus) {
+        backupPoint.setTimestamp(latestTimestampBackedUp);
+        backupStatus.update(backupPoint);
+      }
+    }
+  }
+
+  @Override
+  protected void startIteration() throws IOException {
+    backupStats.setTimetableBackupIterationStart();
+
+    // It's possible that the last iteration failed and left some tmp backup 
files behind - add
+    // them back to candidate backup files so the I/O write will happen again
+    getAllTmpBackupFiles();
+
+    // Create a timetable backup file from the cached records 
(timetableRecordMap)
+    lock.lock();
+    try {
+      if (!timetableRecordMap.isEmpty()) {
+        // Create a temp timetable backup file
+        // Make the backup file temporary so that it will be deleted after the 
iteration provided
+        // the I/O write to the backup storage succeeds. Otherwise, this 
temporary backup file will
+        // continue to exist in tmpDir locally until it's successfully written 
to the backup storage
+        BackupManager.BackupFile timetableBackupFromThisIteration =
+            new BackupManager.BackupFile(createTimetableBackupFile(), true, 0, 
0);
+        candidateTimetableBackupFiles.add(timetableBackupFromThisIteration);
+        timetableRecordMap.clear(); // Clear the map
+      }
+    } catch (Exception e) {
+      logger.error("TimetableBackup::startIteration(): failed to create 
timetable file to back up!",
+          e);
+    } finally {
+      lock.unlock();
+    }
+  }
+
+  @Override
+  protected void endIteration(boolean errorFree) {
+    // timetableRecordMap is cleared in startIteration() already, so we do not 
clear it here
+    backupStats.setTimetableBackupIterationDone(errorFree);
+  }
+
+  @Override
+  protected BackupManager.BackupFile getNextFileToBackup() throws IOException {
+    BackupManager.BackupFile nextFile = null;
+    if (!candidateTimetableBackupFiles.isEmpty()) {
+      nextFile = candidateTimetableBackupFiles.first();
+    }
+    return nextFile;
+  }
+
+  @Override
+  protected void backupComplete(BackupManager.BackupFile file) throws 
IOException {
+    // Remove from the candidate set because it has been copied to the backup 
storage successfully
+    candidateTimetableBackupFiles.remove(file);
+    synchronized (backupStatus) {
+      // Update the latest timestamp to which the timetable backup was 
successful
+      
backupPoint.setTimestamp(Long.parseLong(file.getFile().getName().split("-")[1]));
+      backupStatus.update(backupPoint);
+    }
+    logger.info(
+        "TimetableBackup::backupComplete(): backup complete for file: {}. 
Updated backed up "
+            + "timestamp to {}", file.getFile().getName(), 
backupPoint.getTimestamp());
+  }
+
+  /**
+   * Create a file name for timetable backup.
+   * E.g.) timetable.<lowest timestamp>-<highest timestamp>
+   * @return
+   */
+  private String makeTimetableBackupFileName() {
+    return String.format("%s.%d-%d", TIMETABLE_PREFIX, 
timetableRecordMap.firstKey(),
+        timetableRecordMap.lastKey());
+  }
+
+  /**
+   * Write the content of timetableRecordMap to a file using a tmpDir.
+   * @return
+   * @throws IOException
+   */
+  private File createTimetableBackupFile() throws IOException {
+    File tempTimetableBackupFile = new File(tmpDir, 
makeTimetableBackupFileName());
+    logger.info(
+        "TimetableBackup::createTimetableBackupFile(): created temporary 
timetable backup file with"
+            + " name: " + tempTimetableBackupFile.getName());
+    FileOutputStream fos = new FileOutputStream(tempTimetableBackupFile);
+    ObjectOutputStream oos = new ObjectOutputStream(fos);
+    oos.writeObject(timetableRecordMap);
+    oos.flush();
+    oos.close();
+    fos.close();
+    logger.info(
+        "TimetableBackup::createTimetableBackupFile(): successfully wrote 
cached timetable data to "
+            + "temporary timetable backup file with name: " + 
tempTimetableBackupFile.getName());
+    return tempTimetableBackupFile;

Review Comment:
   *RESOURCE_LEAK:*  resource of type `java.io.FileOutputStream` acquired to 
`fos` by call to `FileOutputStream(...)` at line 195 is not released after line 
204.
   **Note**: potential exception at line 196
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621738&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621738&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621738&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621738&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621738&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.BackupBean;
+import org.apache.zookeeper.server.backup.monitoring.BackupStats;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupBean;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.persistence.ZxidRange;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the backing up of txnlog and snap files to remote
+ * storage for longer term and durable retention than is possible on
+ * an ensemble server
+ */
+public class BackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
+
+  private final Logger logger;
+  private final File snapDir;
+  private final File dataLogDir;
+  private final File tmpDir;
+  private final BackupConfig backupConfig;
+  private final long backupIntervalInMilliseconds;
+  private BackupProcess logBackup = null;
+  private BackupProcess snapBackup = null;
+
+  // backupStatus, backupLogZxid and backedupSnapZxid need to be access while 
synchronized
+  // on backupStatus.
+  private final BackupStatus backupStatus;
+  private BackupPoint backupPoint;
+
+  private final BackupStorageProvider backupStorage;
+  private final long serverId;
+  private final String namespace;
+
+  @VisibleForTesting
+  protected BackupBean backupBean = null;
+  protected TimetableBackupBean timetableBackupBean = null;
+
+  private BackupStats backupStats = null;
+
+  // Optional timetable backup
+  private BackupProcess timetableBackup = null;
+
+  /**
+   * Tracks a file that needs to be backed up, including temporary copies of 
the file
+   */
+  public static class BackupFile {
+    private final File file;
+    private final boolean isTemporary;
+    private final ZxidRange zxidRange;
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param fileMinZxid the min zxid associated with this file
+     * @param fileMaxZxid the max zxid associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, long 
fileMinZxid, long fileMaxZxid) {
+      this(backupFile, isTemporaryFile, new ZxidRange(fileMinZxid, 
fileMaxZxid));
+    }
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param zxidRange the zxid range associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, ZxidRange 
zxidRange) {
+      Preconditions.checkNotNull(zxidRange);
+
+      if (!zxidRange.isHighPresent()) {
+        throw new IllegalArgumentException("ZxidRange must have a high value");
+      }
+
+      this.file = backupFile;
+      this.isTemporary = isTemporaryFile;
+      this.zxidRange = zxidRange;
+    }
+
+    /**
+     * Perform cleanup including deleting temporary files.
+     */
+    public void cleanup() {
+      if (isTemporary && exists()) {
+        file.delete();
+      }
+    }
+
+    /**
+     * Whether the file representing the zxids exists
+     * @return whether the file represented exists
+     */
+    public boolean exists() {
+      return file != null && file.exists();
+    }
+
+    /**
+     * Get the current file (topmost on the stack)
+     * @return the current file
+     */
+    public File getFile() { return file; }
+
+    /**
+     * Get the zxid range associated with this file
+     * @return the zxid range
+     */
+    public ZxidRange getZxidRange() {
+      return zxidRange;
+    }
+
+    /**
+     * Get the min zxid associated with this file
+     * @return the min associated zxid
+     */
+    public long getMinZxid() { return zxidRange.getLow(); }
+
+    /**
+     * Get the max zxid associated with this file
+     * @return the max associated zxid
+     */
+    public long getMaxZxid() { return zxidRange.getHigh(); }
+
+    @Override
+    public String toString() {
+      return String.format("%s : %s : %d - %d",
+          file == null ? "[empty]" : file.getPath(),
+          isTemporary ? "temp" : "perm",
+          zxidRange.getLow(),
+          zxidRange.getHigh());
+    }
+  }
+
+  /**
+   * Implements txnlog specific logic for BackupProcess
+   */
+  protected class TxnLogBackup extends BackupProcess {
+    private long iterationEndPoint;
+    private final FileTxnSnapLog snapLog;
+
+    /**
+     * Constructor for TxnLogBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public TxnLogBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(TxnLogBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.TXNLOG, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_LOG_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.END);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+
+      if (rZxid != backupPoint.getLogZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setLogZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    protected void startIteration() {
+      // Store the current last logged zxid.  This becomes the stopping point
+      // for the current iteration so we don't keep chasing our own tail as
+      // new transactions get written.
+      iterationEndPoint = snapLog.getLastLoggedZxid();
+      backupStats.setTxnLogBackupIterationStart();
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setTxnLogBackupIterationDone(errorFree);
+      iterationEndPoint = 0L;
+    }
+
+    /**
+     * Gets the next txnlog file to backup.  This is a temporary file created 
by copying
+     * all transaction from the previous backup point until the end zxid for 
this iteration, or
+     * a file indicating that some log records were lost.
+     * @return the file that needs to be backed-up.  The minZxid is the first
+     *      zxid contained in the file.  The maxZxid is the last zxid that is 
contained in the
+     *      file.
+     * @throws IOException
+     */
+    protected BackupFile getNextFileToBackup() throws IOException {
+      long startingZxid = backupStatus.read().getLogZxid() + 1;
+
+      // Don't keep chasing the tail so stop if past the last zxid at the time
+      // this iteration started.
+      if (startingZxid > iterationEndPoint) {
+        return null;
+      }
+
+      TxnLog.TxnIterator iter = null;
+      FileTxnLog newFile = null;
+      long lastZxid = -1;
+      int txnCopied = 0;
+      BackupFile ret;
+
+      logger.info("Creating backup file from zxid {}.", 
ZxidUtils.zxidToString(startingZxid));
+
+      try {
+        iter = snapLog.readTxnLog(startingZxid, true);
+
+        // Use a temp directory to avoid conflicts with live txnlog files
+        newFile = new FileTxnLog(tmpDir);
+
+        // TODO: Ideally, we should have have a way to prevent lost TxLogs
+        // Check for lost txnlogs; <=1 indicates that no backups have been 
done before so
+        // nothing can be considered lost.
+        // If a lost sequence is found then return a file whose name encodes 
the lost
+        // sequence and back that up so the backup store has a record of the 
lost sequence
+        if (startingZxid > 1 &&
+            iter.getHeader() != null &&
+            iter.getHeader().getZxid() > startingZxid) {
+
+          logger.error("TxnLog backups lost.  Required starting zxid={}  First 
available zxid={}",
+              ZxidUtils.zxidToString(startingZxid),
+              ZxidUtils.zxidToString(iter.getHeader().getZxid()));
+
+          String fileName = String.format("%s.%s",
+              BackupUtil.LOST_LOG_PREFIX,
+              Long.toHexString(startingZxid));
+          File lostZxidFile = new File(tmpDir, fileName);
+          lostZxidFile.createNewFile();
+
+          return new BackupFile(lostZxidFile, true, startingZxid, 
iter.getHeader().getZxid() - 1);
+        }
+
+        while (iter.getHeader() != null) {
+          TxnHeader hdr = iter.getHeader();
+
+          if (hdr.getZxid() > iterationEndPoint) {
+            break;
+          }
+
+          newFile.append(hdr, iter.getTxn());
+
+          // update position and count only AFTER the record has been 
successfully
+          // copied
+          lastZxid = hdr.getZxid();
+          txnCopied++;
+
+          iter.next();
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+
+        if (ret != null) {
+          logger.info("Copied {} records starting at {} and ending at zxid 
{}.",
+              txnCopied,
+              ZxidUtils.zxidToString(ret.getMinZxid()),
+              ZxidUtils.zxidToString(ret.getMaxZxid()));
+        }
+
+      } catch (IOException e) {
+        logger.warn("Hit exception after {} records.  Exception: {} ", 
txnCopied, e.fillInStackTrace());
+
+        // If any records were copied return those and ignore the error.  
Otherwise
+        // rethrow the error to be handled by the caller as a failed backup 
iteration.
+        if (txnCopied <= 0) {
+          throw e;
+        }
+
+        ret = makeBackupFileFromCopiedLog(newFile, lastZxid);
+      } finally {
+        if (iter != null) {
+          iter.close();
+        }
+
+        if (newFile != null) {
+          newFile.close();
+        }
+      }
+
+      return ret;
+    }
+
+    private BackupFile makeBackupFileFromCopiedLog(FileTxnLog backupTxnLog, 
long lastZxid) {
+
+      if (backupTxnLog == null) {
+        return null;
+      }
+
+      // The logFile gets initialized with the first transaction's zxid
+      File logFile = backupTxnLog.getCurrentFile();
+
+      if (logFile == null) {
+        return null;
+      }
+
+      long firstZxid = Util.getZxidFromName(logFile.getName(), 
Util.TXLOG_PREFIX);
+
+      if (lastZxid == -1) {
+        lastZxid = firstZxid;
+      }
+
+      return new BackupFile(logFile, true, new ZxidRange(firstZxid, lastZxid));
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setLogZxid(file.getMaxZxid());
+        backupStatus.update(backupPoint);
+      }
+
+      logger.info("Updated backedup tnxlog zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getLogZxid()));
+    }
+  }
+
+  /**
+   * Implements snapshot specific logic for BackupProcess
+   */
+  protected class SnapBackup extends BackupProcess {
+    private final FileTxnSnapLog snapLog;
+    private final List<BackupFile> filesToBackup = new ArrayList<>();
+
+    /**
+     * Constructor for SnapBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public SnapBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(SnapBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.SNAPSHOT, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_SNAP_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.START);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getSnapZxid()));
+
+      if (rZxid != backupPoint.getSnapZxid()) {
+        synchronized (backupStatus) {
+          backupPoint.setSnapZxid(rZxid);
+          backupStatus.update(backupPoint);
+        }
+      }
+    }
+
+    /**
+     * Prepares snapshot files to back up and populate filesToBackup.
+     * Note that this implementation persists all snapshots instead of only 
persisting the
+     * latest snapshot.
+     * @throws IOException
+     */
+    protected void startIteration() throws IOException {
+      backupStats.setSnapshotBackupIterationStart();
+      filesToBackup.clear();
+
+      // Get all available snapshots excluding the ones whose 
lastProcessedZxid falls into the
+      // zxid range [0, backedupSnapZxid]
+      List<File> candidateSnapshots = snapLog.findValidSnapshots(0, 
backupPoint.getSnapZxid());
+      // Sort candidateSnapshots from oldest to newest
+      Collections.reverse(candidateSnapshots);
+
+      if (candidateSnapshots.size() == 0) {
+        // Either no snapshots or no newer snapshots to back up, so return
+        return;
+      }
+
+      for (int i = 0; i < candidateSnapshots.size(); i++) {
+        File f = candidateSnapshots.get(i);
+        ZxidRange zxidRange = Util.getZxidRangeFromName(f.getName(), 
Util.SNAP_PREFIX);
+
+        if (i == candidateSnapshots.size() - 1) {
+          // This is the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // Use the last logged zxid for the zxidRange for the latest 
snapshot as a best effort
+            // approach
+            // TODO: Because this is the best effort approach, the zxidRange 
will not be accurate
+            // TODO: Consider rewriting these latest snapshots to backup 
storage if necessary
+            // TODO: when we know the high zxid when we get a newer snapshot
+            long latestZxid = snapLog.getLastLoggedZxid();
+            long consistentAt = latestZxid == -1 ? zxidRange.getLow() : 
latestZxid;
+            zxidRange = new ZxidRange(zxidRange.getLow(), consistentAt);
+          }
+        } else {
+          // All newer snapshots that are not the most recent snapshot
+
+          // Handle backwards compatibility for snapshots that use old style 
naming where
+          // only the starting zxid is included.
+          // TODO: Can be removed after all snapshots being produced have 
ending zxid
+          if (!zxidRange.isHighPresent()) {
+            // ZxidRange will be [low, high] where high will be the zxid right 
before the next
+            // snapshot's lastProcessedZxid
+            long nextSnapshotStartZxid =
+                Util.getZxidFromName(candidateSnapshots.get(i + 1).getName(), 
Util.SNAP_PREFIX);
+            zxidRange = new ZxidRange(zxidRange.getLow(), 
nextSnapshotStartZxid - 1);
+          }
+        }
+
+        filesToBackup.add(new BackupFile(f, false, zxidRange));
+      }
+    }
+
+    protected void endIteration(boolean errorFree) {
+      backupStats.setSnapshotBackupIterationDone(errorFree);
+      filesToBackup.clear();
+    }
+
+    protected BackupFile getNextFileToBackup() {
+      if (filesToBackup.isEmpty()) {
+        return null;
+      }
+
+      return filesToBackup.remove(0);
+    }
+
+    protected void backupComplete(BackupFile file) throws IOException {
+      synchronized (backupStatus) {
+        backupPoint.setSnapZxid(file.getMinZxid());
+        backupStatus.update(backupPoint);
+      }
+      backupStats.incrementNumberOfSnapshotFilesBackedUpThisIteration();
+
+      logger.info("Updated backedup snap zxid to {}",
+          ZxidUtils.zxidToString(backupPoint.getSnapZxid()));

Review Comment:
   *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method 
`BackupManager$SnapBackup.backupComplete(...)` indirectly reads without 
synchronization from `this.this$0.backupPoint.snapZxid`. Potentially races with 
write in method `BackupManager$SnapBackup.backupComplete(...)`.
    Reporting because this access may occur on a background thread.
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621774&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621774&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621774&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621774&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621774&lift_comment_rating=5)
 ]



##########
zookeeper-server/src/main/java/org/apache/zookeeper/server/backup/BackupManager.java:
##########
@@ -0,0 +1,661 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.zookeeper.server.backup;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import javax.management.JMException;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import org.apache.zookeeper.jmx.MBeanRegistry;
+import org.apache.zookeeper.server.backup.BackupUtil.BackupFileType;
+import org.apache.zookeeper.server.backup.BackupUtil.IntervalEndpoint;
+import org.apache.zookeeper.server.backup.exception.BackupException;
+import org.apache.zookeeper.server.backup.monitoring.BackupBean;
+import org.apache.zookeeper.server.backup.monitoring.BackupStats;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupBean;
+import org.apache.zookeeper.server.backup.monitoring.TimetableBackupStats;
+import org.apache.zookeeper.server.backup.storage.BackupStorageProvider;
+import org.apache.zookeeper.server.backup.timetable.TimetableBackup;
+import org.apache.zookeeper.server.persistence.FileTxnLog;
+import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
+import org.apache.zookeeper.server.persistence.TxnLog;
+import org.apache.zookeeper.server.persistence.Util;
+import org.apache.zookeeper.server.persistence.ZxidRange;
+import org.apache.zookeeper.server.util.ZxidUtils;
+import org.apache.zookeeper.txn.TxnHeader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This class manages the backing up of txnlog and snap files to remote
+ * storage for longer term and durable retention than is possible on
+ * an ensemble server
+ */
+public class BackupManager {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BackupManager.class);
+
+  private final Logger logger;
+  private final File snapDir;
+  private final File dataLogDir;
+  private final File tmpDir;
+  private final BackupConfig backupConfig;
+  private final long backupIntervalInMilliseconds;
+  private BackupProcess logBackup = null;
+  private BackupProcess snapBackup = null;
+
+  // backupStatus, backupLogZxid and backedupSnapZxid need to be access while 
synchronized
+  // on backupStatus.
+  private final BackupStatus backupStatus;
+  private BackupPoint backupPoint;
+
+  private final BackupStorageProvider backupStorage;
+  private final long serverId;
+  private final String namespace;
+
+  @VisibleForTesting
+  protected BackupBean backupBean = null;
+  protected TimetableBackupBean timetableBackupBean = null;
+
+  private BackupStats backupStats = null;
+
+  // Optional timetable backup
+  private BackupProcess timetableBackup = null;
+
+  /**
+   * Tracks a file that needs to be backed up, including temporary copies of 
the file
+   */
+  public static class BackupFile {
+    private final File file;
+    private final boolean isTemporary;
+    private final ZxidRange zxidRange;
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param fileMinZxid the min zxid associated with this file
+     * @param fileMaxZxid the max zxid associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, long 
fileMinZxid, long fileMaxZxid) {
+      this(backupFile, isTemporaryFile, new ZxidRange(fileMinZxid, 
fileMaxZxid));
+    }
+
+    /**
+     * Create an instance of a BackupFile for the given initial file and zxid 
range
+     * @param backupFile the initial/original file
+     * @param isTemporaryFile whether the file is a temporary file
+     * @param zxidRange the zxid range associated with this file
+     */
+    public BackupFile(File backupFile, boolean isTemporaryFile, ZxidRange 
zxidRange) {
+      Preconditions.checkNotNull(zxidRange);
+
+      if (!zxidRange.isHighPresent()) {
+        throw new IllegalArgumentException("ZxidRange must have a high value");
+      }
+
+      this.file = backupFile;
+      this.isTemporary = isTemporaryFile;
+      this.zxidRange = zxidRange;
+    }
+
+    /**
+     * Perform cleanup including deleting temporary files.
+     */
+    public void cleanup() {
+      if (isTemporary && exists()) {
+        file.delete();
+      }
+    }
+
+    /**
+     * Whether the file representing the zxids exists
+     * @return whether the file represented exists
+     */
+    public boolean exists() {
+      return file != null && file.exists();
+    }
+
+    /**
+     * Get the current file (topmost on the stack)
+     * @return the current file
+     */
+    public File getFile() { return file; }
+
+    /**
+     * Get the zxid range associated with this file
+     * @return the zxid range
+     */
+    public ZxidRange getZxidRange() {
+      return zxidRange;
+    }
+
+    /**
+     * Get the min zxid associated with this file
+     * @return the min associated zxid
+     */
+    public long getMinZxid() { return zxidRange.getLow(); }
+
+    /**
+     * Get the max zxid associated with this file
+     * @return the max associated zxid
+     */
+    public long getMaxZxid() { return zxidRange.getHigh(); }
+
+    @Override
+    public String toString() {
+      return String.format("%s : %s : %d - %d",
+          file == null ? "[empty]" : file.getPath(),
+          isTemporary ? "temp" : "perm",
+          zxidRange.getLow(),
+          zxidRange.getHigh());
+    }
+  }
+
+  /**
+   * Implements txnlog specific logic for BackupProcess
+   */
+  protected class TxnLogBackup extends BackupProcess {
+    private long iterationEndPoint;
+    private final FileTxnSnapLog snapLog;
+
+    /**
+     * Constructor for TxnLogBackup
+     * @param snapLog the FileTxnSnapLog object to use
+     */
+    public TxnLogBackup(FileTxnSnapLog snapLog) {
+      super(LoggerFactory.getLogger(TxnLogBackup.class), 
BackupManager.this.backupStorage,
+          backupIntervalInMilliseconds);
+      this.snapLog = snapLog;
+    }
+
+    protected void initialize() throws IOException {
+      backupStorage.cleanupInvalidFiles(null);
+
+      BackupFileInfo latest =
+          BackupUtil.getLatest(backupStorage, BackupFileType.TXNLOG, 
IntervalEndpoint.START);
+
+      long rZxid = latest == null
+          ? BackupUtil.INVALID_LOG_ZXID
+          : latest.getIntervalEndpoint(IntervalEndpoint.END);
+
+      logger.info("Latest Zxid from storage: {}  from status: {}",
+          ZxidUtils.zxidToString(rZxid), 
ZxidUtils.zxidToString(backupPoint.getLogZxid()));

Review Comment:
   *THREAD_SAFETY_VIOLATION:*  Read/Write race. Non-private method 
`BackupManager$TxnLogBackup.initialize()` indirectly reads without 
synchronization from `this.this$0.backupPoint.logZxid`. Potentially races with 
write in method `BackupManager$TxnLogBackup.backupComplete(...)`.
    Reporting because this access may occur on a background thread.
   
   (at-me [in a reply](https://help.sonatype.com/lift/talking-to-lift) with 
`help` or `ignore`)
   
   ---
   
   Was this a good recommendation?
   [ [🙁 Not 
relevant](https://www.sonatype.com/lift-comment-rating?comment=226621788&lift_comment_rating=1)
 ] - [ [😕 Won't 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621788&lift_comment_rating=2)
 ] - [ [😑 Not critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621788&lift_comment_rating=3)
 ] - [ [🙂 Critical, will 
fix](https://www.sonatype.com/lift-comment-rating?comment=226621788&lift_comment_rating=4)
 ] - [ [😊 Critical, fixing 
now](https://www.sonatype.com/lift-comment-rating?comment=226621788&lift_comment_rating=5)
 ]



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscr...@zookeeper.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to