Repository: asterixdb
Updated Branches:
  refs/heads/master 08dc8597e -> 0a5b641a9


http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm
 
b/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm
new file mode 100644
index 0000000..c0697b7
--- /dev/null
+++ 
b/asterixdb/asterix-server/src/test/resources/integrationts/replication/results/failover/resync_failed_replica/resync_failed_replica.8.adm
@@ -0,0 +1,38 @@
+{
+  "metadata_node" : "asterix_nc1",
+  "partitions" : {
+    "0" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc1",
+      "iodeviceNum" : 0,
+      "nodeId" : "asterix_nc1",
+      "partitionId" : 0,
+      "pendingActivation" : false
+    },
+    "1" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc1",
+      "iodeviceNum" : 1,
+      "nodeId" : "asterix_nc1",
+      "partitionId" : 1,
+      "pendingActivation" : false
+    },
+    "2" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc2",
+      "iodeviceNum" : 0,
+      "nodeId" : "asterix_nc2",
+      "partitionId" : 2,
+      "pendingActivation" : false
+    },
+    "3" : {
+      "active" : true,
+      "activeNodeId" : "asterix_nc2",
+      "iodeviceNum" : 1,
+      "nodeId" : "asterix_nc2",
+      "partitionId" : 3,
+      "pendingActivation" : false
+    }
+  },
+  "state" : "ACTIVE"
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml
 
b/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml
index 0f6b528..df5dbac 100644
--- 
a/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml
+++ 
b/asterixdb/asterix-server/src/test/resources/integrationts/replication/testsuite.xml
@@ -19,25 +19,8 @@
 <test-suite xmlns="urn:xml.testframework.asterix.apache.org" 
ResultOffsetPath="results" QueryOffsetPath="queries" QueryFileExtension=".aql">
   <test-group name="failover">
     <test-case FilePath="failover">
-      <compilation-unit name="bulkload">
-        <output-dir compare="Text">bulkload</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="failover">
-      <compilation-unit name="mem_component_recovery">
-        <output-dir compare="Text">mem_component_recovery</output-dir>
-      </compilation-unit>
-    </test-case>
-    <test-case FilePath="failover">
-      <compilation-unit name="metadata_node">
-        <output-dir compare="Text">metadata_node</output-dir>
-      </compilation-unit>
-    </test-case>
-  </test-group>
-  <test-group name="failback">
-    <test-case FilePath="failback">
-      <compilation-unit name="node_failback">
-        <output-dir compare="Text">node_failback</output-dir>
+      <compilation-unit name="resync_failed_replica">
+        <output-dir compare="Text">resync_failed_replica</output-dir>
       </compilation-unit>
     </test-case>
   </test-group>

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index 601dec3..54d6268 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -18,37 +18,37 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
+import static 
org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
+import java.io.FilenameFilter;
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.Paths;
+import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StorageConstants;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.commons.io.FileUtils;
@@ -71,7 +71,8 @@ import com.google.common.cache.CacheBuilder;
 public class PersistentLocalResourceRepository implements 
ILocalResourceRepository {
 
     public static final Predicate<Path> INDEX_COMPONENTS = path -> 
!path.endsWith(StorageConstants.METADATA_FILE_NAME);
-    // Private constants
+    private static final FilenameFilter LSM_INDEX_FILES_FILTER =
+            (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
     private static final int MAX_CACHED_RESOURCES = 1000;
     private static final IOFileFilter METADATA_FILES_FILTER = new 
IOFileFilter() {
         @Override
@@ -100,17 +101,14 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     // Finals
     private final IIOManager ioManager;
     private final Cache<String, LocalResource> resourceCache;
-    private final Set<Integer> nodeOriginalPartitions;
-    private final Set<Integer> nodeActivePartitions;
     // Mutables
     private boolean isReplicationEnabled = false;
     private Set<String> filesToBeReplicated;
     private IReplicationManager replicationManager;
-    private Set<Integer> nodeInactivePartitions;
     private final Path[] storageRoots;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
-    public PersistentLocalResourceRepository(IIOManager ioManager, String 
nodeId, MetadataProperties metadataProperties,
+    public PersistentLocalResourceRepository(IIOManager ioManager,
             IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
@@ -122,15 +120,6 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         }
         createStorageRoots();
         resourceCache = 
CacheBuilder.newBuilder().maximumSize(MAX_CACHED_RESOURCES).build();
-        ClusterPartition[] nodePartitions = 
metadataProperties.getNodePartitions().get(nodeId);
-        //initially the node active partitions are the same as the original 
partitions
-        nodeOriginalPartitions = new HashSet<>(nodePartitions.length);
-        nodeActivePartitions = new HashSet<>(nodePartitions.length);
-        nodeInactivePartitions = new HashSet<>(nodePartitions.length);
-        for (ClusterPartition partition : nodePartitions) {
-            nodeOriginalPartitions.add(partition.getPartitionId());
-            nodeActivePartitions.add(partition.getPartitionId());
-        }
     }
 
     @Override
@@ -267,7 +256,6 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
 
         if (isReplicationEnabled) {
             filesToBeReplicated = new HashSet<>();
-            nodeInactivePartitions = ConcurrentHashMap.newKeySet();
         }
     }
 
@@ -299,26 +287,13 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         createStorageRoots();
     }
 
-    public Set<Integer> getActivePartitions() {
-        return Collections.unmodifiableSet(nodeActivePartitions);
-    }
-
-    public Set<Integer> getInactivePartitions() {
-        return Collections.unmodifiableSet(nodeInactivePartitions);
-    }
-
-    public synchronized void addActivePartition(int partitonId) {
-        nodeActivePartitions.add(partitonId);
-        nodeInactivePartitions.remove(partitonId);
-    }
-
-    public synchronized void addInactivePartition(int partitonId) {
-        nodeInactivePartitions.add(partitonId);
-        nodeActivePartitions.remove(partitonId);
+    public Set<Integer> getAllPartitions() throws HyracksDataException {
+        return 
loadAndGetAllResources().values().stream().map(LocalResource::getResource)
+                
.map(DatasetLocalResource.class::cast).map(DatasetLocalResource::getPartition)
+                .collect(Collectors.toSet());
     }
 
     public DatasetResourceReference getLocalResourceReference(String 
absoluteFilePath) throws HyracksDataException {
-        //TODO pass relative path
         final String localResourcePath = 
StoragePathUtil.getIndexFileRelativePath(absoluteFilePath);
         final LocalResource lr = get(localResourcePath);
         return DatasetResourceReference.of(lr);
@@ -351,17 +326,18 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         });
     }
 
-    /**
-     * Given any index file, an absolute {@link FileReference} is returned 
which points to where the index of
-     * {@code indexFile} is located.
-     *
-     * @param indexFile
-     * @return
-     * @throws HyracksDataException
-     */
-    public FileReference getIndexPath(Path indexFile) throws 
HyracksDataException {
-        final ResourceReference ref = 
ResourceReference.of(indexFile.toString());
-        return ioManager.resolve(ref.getRelativePath().toString());
+    public List<String> getPartitionIndexesFiles(int partition) throws 
HyracksDataException {
+        List<String> partitionFiles = new ArrayList<>();
+        Set<File> partitionIndexes = getPartitionIndexes(partition);
+        for (File indexDir : partitionIndexes) {
+            if (indexDir.isDirectory()) {
+                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+                if (indexFiles != null) {
+                    
Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
+                }
+            }
+        }
+        return partitionFiles;
     }
 
     private void createStorageRoots() {

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
index 33c6260..d15e6ff 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepositoryFactory.java
@@ -18,30 +18,23 @@
  */
 package org.apache.asterix.transaction.management.resource;
 
-import org.apache.asterix.common.config.MetadataProperties;
 import org.apache.asterix.common.storage.IIndexCheckpointManagerProvider;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.io.IIOManager;
 import org.apache.hyracks.storage.common.ILocalResourceRepository;
 import org.apache.hyracks.storage.common.file.ILocalResourceRepositoryFactory;
 
 public class PersistentLocalResourceRepositoryFactory implements 
ILocalResourceRepositoryFactory {
     private final IIOManager ioManager;
-    private final String nodeId;
-    private final MetadataProperties metadataProperties;
     private final IIndexCheckpointManagerProvider 
indexCheckpointManagerProvider;
 
-    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager, 
String nodeId,
-            MetadataProperties metadataProperties, 
IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
+    public PersistentLocalResourceRepositoryFactory(IIOManager ioManager,
+            IIndexCheckpointManagerProvider indexCheckpointManagerProvider) {
         this.ioManager = ioManager;
-        this.nodeId = nodeId;
-        this.metadataProperties = metadataProperties;
         this.indexCheckpointManagerProvider = indexCheckpointManagerProvider;
     }
 
     @Override
-    public ILocalResourceRepository createRepository() throws 
HyracksDataException {
-        return new PersistentLocalResourceRepository(ioManager, nodeId, 
metadataProperties,
-                indexCheckpointManagerProvider);
+    public ILocalResourceRepository createRepository() {
+        return new PersistentLocalResourceRepository(ioManager, 
indexCheckpointManagerProvider);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
index b67da80..011d2a1 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogBuffer.java
@@ -26,10 +26,10 @@ import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.asterix.common.context.PrimaryIndexOperationTracker;
 import org.apache.asterix.common.exceptions.ACIDException;
-import org.apache.asterix.common.replication.IReplicationThread;
 import org.apache.asterix.common.transactions.DatasetId;
 import org.apache.asterix.common.transactions.ILogBuffer;
 import org.apache.asterix.common.transactions.ILogRecord;
+import org.apache.asterix.common.transactions.ILogRequester;
 import org.apache.asterix.common.transactions.ITransactionContext;
 import org.apache.asterix.common.transactions.ITransactionSubsystem;
 import org.apache.asterix.common.transactions.LogRecord;
@@ -318,10 +318,9 @@ public class LogBuffer implements ILogBuffer {
             }
         }
         logRecord.isFlushed(true);
-        IReplicationThread replicationThread = 
logRecord.getReplicationThread();
-
-        if (replicationThread != null) {
-            replicationThread.notifyLogReplicationRequester(logRecord);
+        final ILogRequester logRequester = logRecord.getRequester();
+        if (logRequester != null) {
+            logRequester.notifyFlushed(logRecord);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
index 9b6a4f9..53cd038 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/logging/LogManagerWithReplication.java
@@ -61,7 +61,7 @@ public class LogManagerWithReplication extends LogManager {
                     shouldReplicate = false;
             }
         }
-        logRecord.setReplicated(shouldReplicate);
+        logRecord.setReplicate(shouldReplicate);
 
         //Remote flush logs do not need to be flushed separately since they 
may not trigger local flush
         if (logRecord.getLogType() == LogType.FLUSH && 
logRecord.getLogSource() == LogSource.LOCAL) {
@@ -76,9 +76,9 @@ public class LogManagerWithReplication extends LogManager {
     protected void appendToLogTail(ILogRecord logRecord) {
         syncAppendToLogTail(logRecord);
 
-        if (logRecord.isReplicated()) {
+        if (logRecord.isReplicate()) {
             try {
-                replicationManager.replicateLog(logRecord);
+                replicationManager.replicate(logRecord);
             } catch (InterruptedException e) {
                 Thread.currentThread().interrupt();
                 throw new ACIDException(e);
@@ -92,9 +92,9 @@ public class LogManagerWithReplication extends LogManager {
                         logRecord.wait();
                     }
                     //wait for job Commit/Abort ACK from replicas
-                    if (logRecord.isReplicated() && (logRecord.getLogType() == 
LogType.JOB_COMMIT
+                    if (logRecord.isReplicate() && (logRecord.getLogType() == 
LogType.JOB_COMMIT
                             || logRecord.getLogType() == LogType.ABORT)) {
-                        while 
(!replicationManager.hasBeenReplicated(logRecord)) {
+                        while (!logRecord.isReplicated()) {
                             logRecord.wait();
                         }
                     }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
index 68c5ce1..ca7d37a 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/CheckpointManagerFactory.java
@@ -29,11 +29,7 @@ public class CheckpointManagerFactory {
     }
 
     public static ICheckpointManager create(ITransactionSubsystem txnSubsystem,
-            CheckpointProperties checkpointProperties, boolean 
replicationEnabled) {
-        if (!replicationEnabled) {
-            return new CheckpointManager(txnSubsystem, checkpointProperties);
-        } else {
-            return new ReplicationCheckpointManager(txnSubsystem, 
checkpointProperties);
-        }
+            CheckpointProperties checkpointProperties) {
+        return new CheckpointManager(txnSubsystem, checkpointProperties);
     }
 }

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
----------------------------------------------------------------------
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
deleted file mode 100644
index 4bbcabe..0000000
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/service/recovery/ReplicationCheckpointManager.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.asterix.transaction.management.service.recovery;
-
-import java.io.IOException;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.asterix.common.api.IApplicationContext;
-import org.apache.asterix.common.api.IDatasetLifecycleManager;
-import org.apache.asterix.common.cluster.ClusterPartition;
-import org.apache.asterix.common.config.MetadataProperties;
-import org.apache.asterix.common.replication.IReplicaResourcesManager;
-import org.apache.asterix.common.replication.IReplicationManager;
-import org.apache.asterix.common.transactions.CheckpointProperties;
-import org.apache.asterix.common.transactions.ICheckpointManager;
-import org.apache.asterix.common.transactions.ITransactionSubsystem;
-import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
-import org.apache.hyracks.api.exceptions.HyracksDataException;
-import org.apache.logging.log4j.LogManager;
-import org.apache.logging.log4j.Logger;
-
-/**
- * An implementation of {@link ICheckpointManager} that defines the logic
- * of checkpoints when replication is enabled..
- */
-public class ReplicationCheckpointManager extends AbstractCheckpointManager {
-
-    private static final Logger LOGGER = LogManager.getLogger();
-
-    public ReplicationCheckpointManager(ITransactionSubsystem txnSubsystem, 
CheckpointProperties checkpointProperties) {
-        super(txnSubsystem, checkpointProperties);
-    }
-
-    /**
-     * Performs a sharp checkpoint. All datasets are flushed and all 
transaction
-     * log files are deleted except the files that are needed for dead 
replicas.
-     */
-    @Override
-    public synchronized void doSharpCheckpoint() throws HyracksDataException {
-        LOGGER.info("Starting sharp checkpoint...");
-        final IDatasetLifecycleManager datasetLifecycleManager =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-        datasetLifecycleManager.flushAllDatasets();
-        long minFirstLSN;
-        // If shutting down, need to check if we need to keep any remote logs 
for dead replicas
-        if 
(txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().isShuttingdown())
 {
-            final Set<String> deadReplicaIds = 
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext()
-                    .getReplicationManager().getDeadReplicasIds();
-            if (deadReplicaIds.isEmpty()) {
-                // No dead replicas => no need to keep any log
-                minFirstLSN = SHARP_CHECKPOINT_LSN;
-            } else {
-                // Get min LSN of dead replicas remote resources
-                minFirstLSN = getDeadReplicasMinFirstLSN(deadReplicaIds);
-            }
-        } else {
-            // Start up complete checkpoint. Avoid deleting remote recovery 
logs.
-            minFirstLSN = txnSubsystem.getRecoveryManager().getMinFirstLSN();
-        }
-        capture(minFirstLSN, true);
-        if (minFirstLSN == SHARP_CHECKPOINT_LSN) {
-            // No need to keep any logs
-            txnSubsystem.getLogManager().renewLogFiles();
-        } else {
-            // Delete only log files with LSNs < any dead replica partition 
minimum LSN
-            txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
-        }
-        LOGGER.info("Completed sharp checkpoint.");
-    }
-
-    /***
-     * Attempts to perform a soft checkpoint at the specified {@code 
checkpointTargetLSN}.
-     * If a checkpoint cannot be captured due to datasets having LSN < {@code 
checkpointTargetLSN},
-     * an asynchronous flush is triggered on them. If the checkpoint fails due 
to a replica index,
-     * a request is sent to the primary replica of the index to flush it.
-     * When a checkpoint is successful, all transaction log files that end with
-     * LSN < {@code checkpointTargetLSN} are deleted.
-     */
-    @Override
-    public synchronized long tryCheckpoint(long checkpointTargetLSN) throws 
HyracksDataException {
-        LOGGER.info("Attemping soft checkpoint...");
-        final long minFirstLSN = 
txnSubsystem.getRecoveryManager().getMinFirstLSN();
-        boolean checkpointSucceeded = minFirstLSN >= checkpointTargetLSN;
-        if (!checkpointSucceeded) {
-            // Flush datasets with indexes behind target checkpoint LSN
-            final IDatasetLifecycleManager datasetLifecycleManager =
-                    
txnSubsystem.getAsterixAppRuntimeContextProvider().getDatasetLifecycleManager();
-            
datasetLifecycleManager.scheduleAsyncFlushForLaggingDatasets(checkpointTargetLSN);
-            // Request remote replicas to flush lagging indexes
-            final IReplicationManager replicationManager =
-                    
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicationManager();
-            try {
-                
replicationManager.requestFlushLaggingReplicaIndexes(checkpointTargetLSN);
-            } catch (IOException e) {
-                throw new HyracksDataException(e);
-            }
-        }
-        capture(minFirstLSN, false);
-        if (checkpointSucceeded) {
-            txnSubsystem.getLogManager().deleteOldLogFiles(minFirstLSN);
-            LOGGER.info(String.format("soft checkpoint succeeded with at 
LSN(%s)", minFirstLSN));
-        }
-        return minFirstLSN;
-    }
-
-    private long getDeadReplicasMinFirstLSN(Set<String> deadReplicaIds) throws 
HyracksDataException {
-        final IReplicaResourcesManager remoteResourcesManager =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext().getReplicaResourcesManager();
-        final IApplicationContext propertiesProvider =
-                
txnSubsystem.getAsterixAppRuntimeContextProvider().getAppContext();
-        final MetadataProperties metadataProperties = 
propertiesProvider.getMetadataProperties();
-        final PersistentLocalResourceRepository localResourceRepository =
-                (PersistentLocalResourceRepository) 
txnSubsystem.getAsterixAppRuntimeContextProvider()
-                        .getLocalResourceRepository();
-        // Get partitions of the dead replicas that are not active on this node
-        final Set<Integer> deadReplicasPartitions = new HashSet<>();
-        for (String deadReplicaId : deadReplicaIds) {
-            final ClusterPartition[] nodePartitons = 
metadataProperties.getNodePartitions().get(deadReplicaId);
-            for (ClusterPartition partition : nodePartitons) {
-                if 
(!localResourceRepository.getActivePartitions().contains(partition.getPartitionId()))
 {
-                    deadReplicasPartitions.add(partition.getPartitionId());
-                }
-            }
-        }
-        return 
remoteResourcesManager.getPartitionsMinLSN(deadReplicasPartitions);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/asterixdb/blob/0a5b641a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java
----------------------------------------------------------------------
diff --git 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java
 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java
index cbe6d1a..33c9c96 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-api/src/main/java/org/apache/hyracks/api/replication/IReplicationJob.java
@@ -45,4 +45,8 @@ public interface IReplicationJob {
 
     public Set<String> getJobFiles();
 
+    default String getAnyFile() {
+        return getJobFiles().stream().findAny()
+                .orElseThrow(() -> new IllegalStateException("Replication job 
without any files"));
+    }
 }

Reply via email to