This is an automated email from the ASF dual-hosted git repository.

mhubail pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new bf22dbc  [NO ISSUE][REP] Persist master last valid seq on index 
checkpoint
bf22dbc is described below

commit bf22dbcb42fbb252dd2506b41ade61af1b3fa63c
Author: Murtadha Hubail <murtadha.hub...@couchbase.com>
AuthorDate: Wed Aug 25 03:20:05 2021 +0300

    [NO ISSUE][REP] Persist master last valid seq on index checkpoint
    
    - user model changes: no
    - storage format changes: no
    - interface changes: yes
    
    Details:
    
    - When a partition owner replicates a component to a replica,
      maintain the last received component sequence from master.
      This will be used to ensure that any component generated on master,
      but the master fails before replicating it, will not be used when
      the master is re-synced (recovered) from a promoted replica.
    
    Change-Id: I102947712daa07c83b32103b3c58fad46de2dc6d
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/12966
    Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
    Reviewed-by: Murtadha Hubail <mhub...@apache.org>
    Reviewed-by: Ali Alsuliman <ali.al.solai...@gmail.com>
---
 .../asterix/app/nc/IndexCheckpointManager.java     |  30 +++--
 .../org/apache/asterix/app/nc/ReplicaManager.java  |  18 +++
 .../common/storage/IIndexCheckpointManager.java    |  20 +++-
 .../asterix/common/storage/IReplicaManager.java    |   8 ++
 .../asterix/common/storage/IndexCheckpoint.java    |  24 +++-
 .../asterix/common/storage/ResourceReference.java  |   4 +
 .../asterix/common/utils/StorageConstants.java     |   1 +
 .../messaging/CheckpointPartitionIndexesTask.java  |  20 +++-
 .../messaging/MarkComponentValidTask.java          |  15 ++-
 .../messaging/PartitionResourcesListResponse.java  |  43 +++++--
 .../messaging/PartitionResourcesListTask.java      |  11 +-
 .../replication/messaging/ReplicateFileTask.java   |  15 ++-
 .../asterix/replication/sync/FileSynchronizer.java |   4 +-
 .../replication/sync/IndexSynchronizer.java        |   6 +-
 .../replication/sync/ReplicaFilesSynchronizer.java | 123 ++++++++++++++++++---
 .../replication/sync/ReplicaSynchronizer.java      |   4 +-
 .../PersistentLocalResourceRepository.java         |  20 +++-
 17 files changed, 308 insertions(+), 58 deletions(-)

diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
index 4acc6d3..2d40ec8 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/IndexCheckpointManager.java
@@ -56,7 +56,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
     }
 
     @Override
-    public synchronized void init(long validComponentSequence, long lsn, long 
validComponentId)
+    public synchronized void init(long validComponentSequence, long lsn, long 
validComponentId, String masterNodeId)
             throws HyracksDataException {
         List<IndexCheckpoint> checkpoints;
         try {
@@ -68,34 +68,42 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
             LOGGER.warn(() -> "Checkpoints found on initializing: " + 
indexPath);
             delete();
         }
-        IndexCheckpoint firstCheckpoint = 
IndexCheckpoint.first(validComponentSequence, lsn, validComponentId);
+        IndexCheckpoint firstCheckpoint =
+                IndexCheckpoint.first(validComponentSequence, lsn, 
validComponentId, masterNodeId);
         persist(firstCheckpoint);
     }
 
     @Override
-    public synchronized void replicated(long componentSequence, long 
masterLsn, long componentId)
+    public synchronized void replicated(long componentSequence, long 
masterLsn, long componentId, String masterNodeId)
             throws HyracksDataException {
         final Long localLsn = 
getLatest().getMasterNodeFlushMap().get(masterLsn);
         if (localLsn == null) {
-            throw new IllegalStateException("Component flushed before lsn 
mapping was received");
+            throw new IllegalStateException("Component replicated before lsn 
mapping was received");
         }
-        flushed(componentSequence, localLsn, componentId);
+        flushed(componentSequence, localLsn, componentId, masterNodeId);
     }
 
     @Override
-    public synchronized void flushed(long componentSequence, long lsn, long 
componentId) throws HyracksDataException {
+    public synchronized void flushed(long componentSequence, long lsn, long 
componentId, String masterNodeId)
+            throws HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        IndexCheckpoint nextCheckpoint = IndexCheckpoint.next(latest, lsn, 
componentSequence, componentId);
+        IndexCheckpoint nextCheckpoint =
+                IndexCheckpoint.next(latest, lsn, componentSequence, 
componentId, masterNodeId);
         persist(nextCheckpoint);
         deleteHistory(nextCheckpoint.getId(), HISTORY_CHECKPOINTS);
     }
 
     @Override
+    public synchronized void flushed(long componentSequence, long lsn, long 
componentId) throws HyracksDataException {
+        flushed(componentSequence, lsn, componentId, null);
+    }
+
+    @Override
     public synchronized void masterFlush(long masterLsn, long localLsn) throws 
HyracksDataException {
         final IndexCheckpoint latest = getLatest();
         latest.getMasterNodeFlushMap().put(masterLsn, localLsn);
         final IndexCheckpoint next = IndexCheckpoint.next(latest, 
latest.getLowWatermark(),
-                latest.getValidComponentSequence(), 
latest.getLastComponentId());
+                latest.getValidComponentSequence(), 
latest.getLastComponentId(), null);
         persist(next);
         notifyAll();
     }
@@ -155,8 +163,8 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
     @Override
     public synchronized void setLastComponentId(long componentId) throws 
HyracksDataException {
         final IndexCheckpoint latest = getLatest();
-        final IndexCheckpoint next =
-                IndexCheckpoint.next(latest, latest.getLowWatermark(), 
latest.getValidComponentSequence(), componentId);
+        final IndexCheckpoint next = IndexCheckpoint.next(latest, 
latest.getLowWatermark(),
+                latest.getValidComponentSequence(), componentId, null);
         persist(next);
     }
 
@@ -165,7 +173,7 @@ public class IndexCheckpointManager implements 
IIndexCheckpointManager {
         final IndexCheckpoint latest = getLatest();
         if (componentSequence > latest.getValidComponentSequence()) {
             final IndexCheckpoint next = IndexCheckpoint.next(latest, 
latest.getLowWatermark(), componentSequence,
-                    latest.getLastComponentId());
+                    latest.getLastComponentId(), null);
             persist(next);
         }
     }
diff --git 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
index 66a69ef..f6de92d 100644
--- 
a/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
+++ 
b/asterixdb/asterix-app/src/main/java/org/apache/asterix/app/nc/ReplicaManager.java
@@ -19,6 +19,7 @@
 package org.apache.asterix.app.nc;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -30,6 +31,7 @@ import java.util.stream.Stream;
 
 import org.apache.asterix.common.api.IDatasetLifecycleManager;
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.cluster.ClusterPartition;
 import org.apache.asterix.common.replication.IPartitionReplica;
 import org.apache.asterix.common.storage.IReplicaManager;
 import org.apache.asterix.common.storage.ReplicaIdentifier;
@@ -58,10 +60,12 @@ public class ReplicaManager implements IReplicaManager {
      */
     private final Map<ReplicaIdentifier, PartitionReplica> replicas = new 
HashMap<>();
     private final Object replicaSyncLock = new Object();
+    private final Set<Integer> nodeOwnedPartitions = new HashSet<>();
 
     public ReplicaManager(INcApplicationContext appCtx, Set<Integer> 
partitions) {
         this.appCtx = appCtx;
         this.partitions.addAll(partitions);
+        setNodeOwnedPartitions(appCtx);
     }
 
     @Override
@@ -154,6 +158,11 @@ public class ReplicaManager implements IReplicaManager {
         return new ArrayList<>(replicas.values());
     }
 
+    @Override
+    public boolean isPartitionOwner(int partition) {
+        return nodeOwnedPartitions.contains(partition);
+    }
+
     public void closePartitionResources(int partition) throws 
HyracksDataException {
         final IDatasetLifecycleManager datasetLifecycleManager = 
appCtx.getDatasetLifecycleManager();
         //TODO(mhubail) we can flush only datasets of the requested partition
@@ -171,4 +180,13 @@ public class ReplicaManager implements IReplicaManager {
         String nodeId = appCtx.getServiceContext().getNodeId();
         return id.getNodeId().equals(nodeId);
     }
+
+    private void setNodeOwnedPartitions(INcApplicationContext appCtx) {
+        ClusterPartition[] clusterPartitions =
+                
appCtx.getMetadataProperties().getNodePartitions().get(appCtx.getServiceContext().getNodeId());
+        if (clusterPartitions != null) {
+            
nodeOwnedPartitions.addAll(Arrays.stream(clusterPartitions).map(ClusterPartition::getPartitionId)
+                    .collect(Collectors.toList()));
+        }
+    }
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
index 801ca0b..f9230dd 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IIndexCheckpointManager.java
@@ -28,9 +28,23 @@ public interface IIndexCheckpointManager {
      * @param validComponentSequence
      * @param lsn
      * @param validComponentId
+     * @param masterNodeId
      * @throws HyracksDataException
      */
-    void init(long validComponentSequence, long lsn, long validComponentId) 
throws HyracksDataException;
+    void init(long validComponentSequence, long lsn, long validComponentId, 
String masterNodeId)
+            throws HyracksDataException;
+
+    /**
+     * Called when a new LSM disk component is flushed due to a replicated 
component.
+     * When called, the index checkpoint is updated with the latest valid 
{@code componentSequence}
+     * and low watermark {@code lsn}
+     *
+     * @param componentSequence
+     * @param lsn
+     * @param masterNodeId
+     * @throws HyracksDataException
+     */
+    void flushed(long componentSequence, long lsn, long componentId, String 
masterNodeId) throws HyracksDataException;
 
     /**
      * Called when a new LSM disk component is flushed. When called, the index 
checkpoint is updated
@@ -50,9 +64,11 @@ public interface IIndexCheckpointManager {
      * @param componentSequence
      * @param masterLsn
      * @param componentId
+     * @param masterNodeId
      * @throws HyracksDataException
      */
-    void replicated(long componentSequence, long masterLsn, long componentId) 
throws HyracksDataException;
+    void replicated(long componentSequence, long masterLsn, long componentId, 
String masterNodeId)
+            throws HyracksDataException;
 
     /**
      * Called when a flush log is received and replicated from master. The 
mapping between
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
index 4f46227..88d3113 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IReplicaManager.java
@@ -100,4 +100,12 @@ public interface IReplicaManager {
      * @return the list of replicas
      */
     List<IPartitionReplica> getReplicas();
+
+    /**
+     * Returns true if {@code partition} is owned by this node, otherwise 
false.
+     *
+     * @param partition
+     * @return true if the partition is owned by this node, otherwise false.
+     */
+    boolean isPartitionOwner(int partition);
 }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
index 24b9ae6..a2cf531 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/IndexCheckpoint.java
@@ -44,8 +44,11 @@ public class IndexCheckpoint {
     private long lowWatermark;
     private long lastComponentId;
     private Map<Long, Long> masterNodeFlushMap;
+    private String masterNodeId;
+    private long masterValidSeq;
 
-    public static IndexCheckpoint first(long lastComponentSequence, long 
lowWatermark, long validComponentId) {
+    public static IndexCheckpoint first(long lastComponentSequence, long 
lowWatermark, long validComponentId,
+            String masterNodeId) {
         IndexCheckpoint firstCheckpoint = new IndexCheckpoint();
         firstCheckpoint.id = INITIAL_CHECKPOINT_ID;
         firstCheckpoint.lowWatermark = lowWatermark;
@@ -53,11 +56,13 @@ public class IndexCheckpoint {
         firstCheckpoint.lastComponentId = validComponentId;
         firstCheckpoint.masterNodeFlushMap = new HashMap<>();
         firstCheckpoint.masterNodeFlushMap.put(HAS_NULL_MISSING_VALUES_FIX, 
HAS_NULL_MISSING_VALUES_FIX);
+        firstCheckpoint.masterNodeId = masterNodeId;
+        firstCheckpoint.masterValidSeq = lastComponentSequence;
         return firstCheckpoint;
     }
 
     public static IndexCheckpoint next(IndexCheckpoint latest, long 
lowWatermark, long validComponentSequence,
-            long lastComponentId) {
+            long lastComponentId, String masterNodeId) {
         if (lowWatermark < latest.getLowWatermark()) {
             if (LOGGER.isErrorEnabled()) {
                 LOGGER.error("low watermark {} less than the latest checkpoint 
low watermark {}", lowWatermark, latest);
@@ -70,6 +75,13 @@ public class IndexCheckpoint {
         next.lastComponentId = lastComponentId;
         next.validComponentSequence = validComponentSequence;
         next.masterNodeFlushMap = latest.getMasterNodeFlushMap();
+        if (masterNodeId != null) {
+            next.masterNodeId = masterNodeId;
+            next.masterValidSeq = validComponentSequence;
+        } else {
+            next.masterNodeId = latest.getMasterNodeId();
+            next.masterValidSeq = latest.getMasterValidSeq();
+        }
         // remove any lsn from the map that wont be used anymore
         next.masterNodeFlushMap.values().removeIf(lsn -> lsn < lowWatermark && 
lsn != HAS_NULL_MISSING_VALUES_FIX);
         return next;
@@ -111,6 +123,14 @@ public class IndexCheckpoint {
         }
     }
 
+    public String getMasterNodeId() {
+        return masterNodeId;
+    }
+
+    public long getMasterValidSeq() {
+        return masterValidSeq;
+    }
+
     public static IndexCheckpoint fromJson(String json) throws 
HyracksDataException {
         try {
             return OBJECT_MAPPER.readValue(json, IndexCheckpoint.class);
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
index ebf212b..7065767 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/storage/ResourceReference.java
@@ -147,6 +147,10 @@ public class ResourceReference {
         return 
ResourceReference.ofIndex(relativePath.getParent().resolve(dataset).toFile().getPath());
     }
 
+    public boolean isMetadataResource() {
+        return getName().equals(StorageConstants.METADATA_FILE_NAME);
+    }
+
     public Path getFileRelativePath() {
         return relativePath.resolve(name);
     }
diff --git 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
index 6a080bd..2d231d3 100644
--- 
a/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
+++ 
b/asterixdb/asterix-common/src/main/java/org/apache/asterix/common/utils/StorageConstants.java
@@ -37,6 +37,7 @@ public class StorageConstants {
      * begin with ".". Otherwise {@link AbstractLSMIndexFileManager} will try 
to
      * use them as index files.
      */
+    public static final String INDEX_NON_DATA_FILES_PREFIX = ".";
     public static final String INDEX_CHECKPOINT_FILE_PREFIX = 
".idx_checkpoint_";
     public static final String METADATA_FILE_NAME = ".metadata";
     public static final String MASK_FILE_PREFIX = ".mask_";
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
index dac4a70..97b6556 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/CheckpointPartitionIndexesTask.java
@@ -48,10 +48,12 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
 
     private final int partition;
     private final long maxComponentId;
+    private final String masterNodeId;
 
-    public CheckpointPartitionIndexesTask(int partition, long maxComponentId) {
+    public CheckpointPartitionIndexesTask(int partition, long maxComponentId, 
String masterNodeId) {
         this.partition = partition;
         this.maxComponentId = maxComponentId;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -66,7 +68,6 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
         for (LocalResource ls : partitionResources) {
             DatasetResourceReference ref = DatasetResourceReference.of(ls);
             final IIndexCheckpointManager indexCheckpointManager = 
indexCheckpointManagerProvider.get(ref);
-            indexCheckpointManager.delete();
             // Get most recent sequence of existing files to avoid deletion
             Path indexPath = StoragePathUtil.getIndexPath(ioManager, ref);
             String[] files = 
indexPath.toFile().list(AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER);
@@ -79,7 +80,11 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
                 maxComponentSequence =
                         Math.max(maxComponentSequence, 
IndexComponentFileReference.of(file).getSequenceEnd());
             }
-            indexCheckpointManager.init(maxComponentSequence, currentLSN, 
maxComponentId);
+            if (indexCheckpointManager.getCheckpointCount() > 0) {
+                indexCheckpointManager.flushed(maxComponentSequence, 
currentLSN, maxComponentId, masterNodeId);
+            } else {
+                indexCheckpointManager.init(maxComponentSequence, currentLSN, 
maxComponentId, masterNodeId);
+            }
         }
         ReplicationProtocol.sendAck(worker.getChannel(), 
worker.getReusableBuffer());
     }
@@ -95,6 +100,11 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
             dos.writeLong(maxComponentId);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -104,7 +114,9 @@ public class CheckpointPartitionIndexesTask implements 
IReplicaTask {
         try {
             int partition = input.readInt();
             long maxComponentId = input.readLong();
-            return new CheckpointPartitionIndexesTask(partition, 
maxComponentId);
+            final boolean hasMaster = input.readBoolean();
+            final String masterNodeId = hasMaster ? input.readUTF() : null;
+            return new CheckpointPartitionIndexesTask(partition, 
maxComponentId, masterNodeId);
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
index b8f61d0..c92a527 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/MarkComponentValidTask.java
@@ -46,11 +46,13 @@ public class MarkComponentValidTask implements IReplicaTask 
{
     private final long masterLsn;
     private final long lastComponentId;
     private final String file;
+    private final String masterNodeId;
 
-    public MarkComponentValidTask(String file, long masterLsn, long 
lastComponentId) {
+    public MarkComponentValidTask(String file, long masterLsn, long 
lastComponentId, String masterNodeId) {
         this.file = file;
         this.lastComponentId = lastComponentId;
         this.masterLsn = masterLsn;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -95,7 +97,7 @@ public class MarkComponentValidTask implements IReplicaTask {
                 replicationTimeOut -= 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime);
             }
             final long componentSequence = 
IndexComponentFileReference.of(indexRef.getName()).getSequenceEnd();
-            indexCheckpointManager.replicated(componentSequence, masterLsn, 
lastComponentId);
+            indexCheckpointManager.replicated(componentSequence, masterLsn, 
lastComponentId, masterNodeId);
         }
     }
 
@@ -111,6 +113,11 @@ public class MarkComponentValidTask implements 
IReplicaTask {
             dos.writeUTF(file);
             dos.writeLong(masterLsn);
             dos.writeLong(lastComponentId);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -120,6 +127,8 @@ public class MarkComponentValidTask implements IReplicaTask 
{
         final String indexFile = input.readUTF();
         final long lsn = input.readLong();
         final long lastComponentId = input.readLong();
-        return new MarkComponentValidTask(indexFile, lsn, lastComponentId);
+        final boolean hasMaster = input.readBoolean();
+        final String masterNodeId = hasMaster ? input.readUTF() : null;
+        return new MarkComponentValidTask(indexFile, lsn, lastComponentId, 
masterNodeId);
     }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
index 9c3902e..a9921c6 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListResponse.java
@@ -23,7 +23,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.asterix.replication.api.IReplicationMessage;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -31,11 +33,16 @@ import 
org.apache.hyracks.api.exceptions.HyracksDataException;
 public class PartitionResourcesListResponse implements IReplicationMessage {
 
     private final int partition;
-    private final List<String> resources;
+    private Map<String, Long> partitionReplicatedResources;
+    private final List<String> files;
+    private final boolean owner;
 
-    public PartitionResourcesListResponse(int partition, List<String> 
resources) {
+    public PartitionResourcesListResponse(int partition, Map<String, Long> 
partitionReplicatedResources,
+            List<String> files, boolean owner) {
         this.partition = partition;
-        this.resources = resources;
+        this.partitionReplicatedResources = partitionReplicatedResources;
+        this.files = files;
+        this.owner = owner;
     }
 
     @Override
@@ -48,17 +55,23 @@ public class PartitionResourcesListResponse implements 
IReplicationMessage {
         try {
             DataOutputStream dos = new DataOutputStream(out);
             dos.writeInt(partition);
-            dos.writeInt(resources.size());
-            for (String file : resources) {
+            dos.writeInt(files.size());
+            for (String file : files) {
                 dos.writeUTF(file);
             }
+            dos.writeBoolean(owner);
+            dos.writeInt(partitionReplicatedResources.size());
+            for (Map.Entry<String, Long> stringLongEntry : 
partitionReplicatedResources.entrySet()) {
+                dos.writeUTF(stringLongEntry.getKey());
+                dos.writeLong(stringLongEntry.getValue());
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
     }
 
-    public List<String> getResources() {
-        return resources;
+    public List<String> getFiles() {
+        return files;
     }
 
     public static PartitionResourcesListResponse create(DataInput input) 
throws IOException {
@@ -68,6 +81,20 @@ public class PartitionResourcesListResponse implements 
IReplicationMessage {
         for (int i = 0; i < size; i++) {
             resources.add(input.readUTF());
         }
-        return new PartitionResourcesListResponse(partition, resources);
+        boolean owner = input.readBoolean();
+        int resourceSize = input.readInt();
+        Map<String, Long> partitionReplicatedResources = new HashMap<>();
+        for (int i = 0; i < resourceSize; i++) {
+            partitionReplicatedResources.put(input.readUTF(), 
input.readLong());
+        }
+        return new PartitionResourcesListResponse(partition, 
partitionReplicatedResources, resources, owner);
+    }
+
+    public boolean isOwner() {
+        return owner;
+    }
+
+    public Map<String, Long> getPartitionReplicatedResources() {
+        return partitionReplicatedResources;
     }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index cff12de..3ea252f 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
@@ -50,11 +51,15 @@ public class PartitionResourcesListTask implements 
IReplicaTask {
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
         final IReplicationStrategy replicationStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
-        final List<String> partitionResources =
+        // .metadata file -> resource id
+        Map<String, Long> partitionReplicatedResources =
+                
localResourceRepository.getPartitionReplicatedResources(partition, 
replicationStrategy);
+        // all data files in partitions + .metadata files
+        final List<String> partitionFiles =
                 localResourceRepository.getPartitionReplicatedFiles(partition, 
replicationStrategy).stream()
                         
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
-        final PartitionResourcesListResponse response =
-                new PartitionResourcesListResponse(partition, 
partitionResources);
+        final PartitionResourcesListResponse response = new 
PartitionResourcesListResponse(partition,
+                partitionReplicatedResources, partitionFiles, 
appCtx.getReplicaManager().isPartitionOwner(partition));
         ReplicationProtocol.sendTo(worker.getChannel(), response, 
worker.getReusableBuffer());
     }
 
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
index 44c7bea..3ee3094 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicateFileTask.java
@@ -55,11 +55,13 @@ public class ReplicateFileTask implements IReplicaTask {
     private final String file;
     private final long size;
     private final boolean indexMetadata;
+    private final String masterNodeId;
 
-    public ReplicateFileTask(String file, long size, boolean indexMetadata) {
+    public ReplicateFileTask(String file, long size, boolean indexMetadata, 
String masterNodeId) {
         this.file = file;
         this.size = size;
         this.indexMetadata = indexMetadata;
+        this.masterNodeId = masterNodeId;
     }
 
     @Override
@@ -103,7 +105,7 @@ public class ReplicateFileTask implements IReplicaTask {
         final long currentLSN = 
appCtx.getTransactionSubsystem().getLogManager().getAppendLSN();
         indexCheckpointManager.delete();
         indexCheckpointManager.init(UNINITIALIZED_COMPONENT_SEQ, currentLSN,
-                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
+                LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), 
masterNodeId);
         LOGGER.info(() -> "Checkpoint index: " + indexRef);
     }
 
@@ -119,6 +121,11 @@ public class ReplicateFileTask implements IReplicaTask {
             dos.writeUTF(file);
             dos.writeLong(size);
             dos.writeBoolean(indexMetadata);
+            boolean hasMaster = masterNodeId != null;
+            dos.writeBoolean(hasMaster);
+            if (hasMaster) {
+                dos.writeUTF(masterNodeId);
+            }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
         }
@@ -128,7 +135,9 @@ public class ReplicateFileTask implements IReplicaTask {
         final String s = input.readUTF();
         final long i = input.readLong();
         final boolean isMetadata = input.readBoolean();
-        return new ReplicateFileTask(s, i, isMetadata);
+        final boolean hasMaster = input.readBoolean();
+        final String masterNodeId = hasMaster ? input.readUTF() : null;
+        return new ReplicateFileTask(s, i, isMetadata, masterNodeId);
     }
 
     @Override
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
index 7bb2858..813b293 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/FileSynchronizer.java
@@ -55,7 +55,9 @@ public class FileSynchronizer {
             final IIOManager ioManager = appCtx.getIoManager();
             final ISocketChannel channel = replica.getChannel();
             final FileReference filePath = ioManager.resolve(file);
-            ReplicateFileTask task = new ReplicateFileTask(file, 
filePath.getFile().length(), metadata);
+            String masterNode = 
appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+                    ? appCtx.getServiceContext().getNodeId() : null;
+            ReplicateFileTask task = new ReplicateFileTask(file, 
filePath.getFile().length(), metadata, masterNode);
             LOGGER.info("attempting to replicate {} to replica {}", task, 
replica);
             ReplicationProtocol.sendTo(replica, task);
             // send the file itself
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
index 47f872c..48ad5a5 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/IndexSynchronizer.java
@@ -94,8 +94,10 @@ public class IndexSynchronizer {
         final FileSynchronizer fileSynchronizer = new FileSynchronizer(appCtx, 
replica);
         
job.getJobFiles().stream().map(StoragePathUtil::getFileRelativePath).forEach(fileSynchronizer::replicate);
         // send mark component valid
-        MarkComponentValidTask markValidTask =
-                new MarkComponentValidTask(indexFile, 
getReplicatedComponentLsn(), getReplicatedComponentId());
+        String masterNode = 
appCtx.getReplicaManager().isPartitionOwner(replica.getIdentifier().getPartition())
+                ? appCtx.getServiceContext().getNodeId() : null;
+        MarkComponentValidTask markValidTask = new 
MarkComponentValidTask(indexFile, getReplicatedComponentLsn(),
+                getReplicatedComponentId(), masterNode);
         ReplicationProtocol.sendTo(replica, markValidTask);
         ReplicationProtocol.waitForAck(replica);
         LOGGER.debug("Replicated component ({}) to replica {}", indexFile, 
replica);
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index b47fd39..3c93d17 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -20,26 +20,39 @@ package org.apache.asterix.replication.sync;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
 import org.apache.asterix.common.replication.IReplicationStrategy;
+import org.apache.asterix.common.storage.IIndexCheckpointManager;
+import org.apache.asterix.common.storage.IndexCheckpoint;
+import org.apache.asterix.common.storage.ResourceReference;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
 import org.apache.asterix.replication.messaging.PartitionResourcesListTask;
 import org.apache.asterix.replication.messaging.ReplicationProtocol;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
+import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.network.ISocketChannel;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager;
+import 
org.apache.hyracks.storage.am.lsm.common.impls.IndexComponentFileReference;
+import org.apache.hyracks.storage.common.LocalResource;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
 
 /**
  * Ensures that the files between master and a replica are synchronized
  */
 public class ReplicaFilesSynchronizer {
 
+    private static final Logger LOGGER = LogManager.getLogger();
     private final PartitionReplica replica;
     private final INcApplicationContext appCtx;
 
@@ -50,31 +63,42 @@ public class ReplicaFilesSynchronizer {
 
     public void sync() throws IOException {
         final int partition = replica.getIdentifier().getPartition();
-        final Set<String> replicaFiles = getReplicaFiles(partition);
+        PartitionResourcesListResponse replicaResourceResponse = 
getReplicaFiles(partition);
+        Map<ResourceReference, Long> resourceReferenceLongMap = 
getValidReplicaResources(
+                replicaResourceResponse.getPartitionReplicatedResources(), 
replicaResourceResponse.isOwner());
+        // clean up files for invalid resources (deleted or recreated while 
the replica was down)
+        Set<String> deletedReplicaFiles =
+                cleanupReplicaInvalidResources(replicaResourceResponse, 
resourceReferenceLongMap);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         final IReplicationStrategy replicationStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
         final Set<String> masterFiles =
                 localResourceRepository.getPartitionReplicatedFiles(partition, 
replicationStrategy).stream()
                         
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
-        // find files on master and not on replica
-        final List<String> replicaMissingFiles =
-                masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
-        replicateMissingFiles(replicaMissingFiles);
-        // find files on replica and not on master
+        // exclude from the replica files the list of invalid deleted files
+        final Set<String> replicaFiles = new 
HashSet<>(replicaResourceResponse.getFiles());
+        replicaFiles.removeAll(deletedReplicaFiles);
+        syncMissingFiles(replicaFiles, masterFiles);
+        deleteReplicaExtraFiles(replicaFiles, masterFiles);
+    }
+
+    private void deleteReplicaExtraFiles(Set<String> replicaFiles, Set<String> 
masterFiles) {
         final List<String> replicaInvalidFiles =
                 replicaFiles.stream().filter(file -> 
!masterFiles.contains(file)).collect(Collectors.toList());
-        deleteInvalidFiles(replicaInvalidFiles);
+        if (!replicaInvalidFiles.isEmpty()) {
+            LOGGER.debug("deleting files not on current master {} on replica 
{}", replicaInvalidFiles,
+                    replica.getIdentifier());
+            deleteInvalidFiles(replicaInvalidFiles);
+        }
     }
 
-    private Set<String> getReplicaFiles(int partition) throws IOException {
-        final PartitionResourcesListTask replicaFilesRequest = new 
PartitionResourcesListTask(partition);
-        final ISocketChannel channel = replica.getChannel();
-        final ByteBuffer reusableBuffer = replica.getReusableBuffer();
-        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
-        final PartitionResourcesListResponse response =
-                (PartitionResourcesListResponse) 
ReplicationProtocol.read(channel, reusableBuffer);
-        return new HashSet<>(response.getResources());
+    private void syncMissingFiles(Set<String> replicaFiles, Set<String> 
masterFiles) {
+        final List<String> replicaMissingFiles =
+                masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
+        if (!replicaMissingFiles.isEmpty()) {
+            LOGGER.debug("replicating missing files {} on replica {}", 
replicaMissingFiles, replica.getIdentifier());
+            replicateMissingFiles(replicaMissingFiles);
+        }
     }
 
     private void replicateMissingFiles(List<String> files) {
@@ -88,4 +112,73 @@ public class ReplicaFilesSynchronizer {
         final FileSynchronizer sync = new FileSynchronizer(appCtx, replica);
         files.forEach(sync::delete);
     }
+
+    private long getResourceMasterValidSeq(ResourceReference rr) throws 
HyracksDataException {
+        IIndexCheckpointManager iIndexCheckpointManager = 
appCtx.getIndexCheckpointManagerProvider().get(rr);
+        int checkpointCount = iIndexCheckpointManager.getCheckpointCount();
+        if (checkpointCount > 0) {
+            IndexCheckpoint latest = iIndexCheckpointManager.getLatest();
+            long masterValidSeq = latest.getMasterValidSeq();
+            LOGGER.info("setting resource {} valid component seq to {}", rr, 
masterValidSeq);
+            return masterValidSeq;
+        }
+        return AbstractLSMIndexFileManager.UNINITIALIZED_COMPONENT_SEQ;
+    }
+
+    private Set<String> 
cleanupReplicaInvalidResources(PartitionResourcesListResponse 
replicaResourceResponse,
+            Map<ResourceReference, Long> validReplicaResources) {
+        Set<String> invalidFiles = new HashSet<>();
+        for (String replicaResPath : replicaResourceResponse.getFiles()) {
+            ResourceReference replicaRes = 
ResourceReference.of(replicaResPath);
+            if (!validReplicaResources.containsKey(replicaRes)) {
+                LOGGER.debug("replica invalid file {} to be deleted", 
replicaRes.getFileRelativePath());
+                invalidFiles.add(replicaResPath);
+            } else if (replicaResourceResponse.isOwner() && 
!replicaRes.isMetadataResource()) {
+                // find files where the owner generated and failed before 
replicating
+                Long masterValidSeq = validReplicaResources.get(replicaRes);
+                IndexComponentFileReference componentFileReference =
+                        IndexComponentFileReference.of(replicaRes.getName());
+                if (componentFileReference.getSequenceStart() > masterValidSeq
+                        || componentFileReference.getSequenceEnd() > 
masterValidSeq) {
+                    LOGGER.debug("will ask replica {} to delete file {} based 
on valid master valid seq {}",
+                            replica.getIdentifier(), replicaResPath, 
masterValidSeq);
+                    invalidFiles.add(replicaResPath);
+                }
+            }
+        }
+        if (!invalidFiles.isEmpty()) {
+            LOGGER.info("will delete the following files from replica {}", 
invalidFiles);
+            deleteInvalidFiles(new ArrayList<>(invalidFiles));
+        }
+        return invalidFiles;
+    }
+
+    private PartitionResourcesListResponse getReplicaFiles(int partition) 
throws IOException {
+        final PartitionResourcesListTask replicaFilesRequest = new 
PartitionResourcesListTask(partition);
+        final ISocketChannel channel = replica.getChannel();
+        final ByteBuffer reusableBuffer = replica.getReusableBuffer();
+        ReplicationProtocol.sendTo(replica, replicaFilesRequest);
+        return (PartitionResourcesListResponse) 
ReplicationProtocol.read(channel, reusableBuffer);
+    }
+
+    private Map<ResourceReference, Long> getValidReplicaResources(Map<String, 
Long> partitionReplicatedResources,
+            boolean owner) throws HyracksDataException {
+        Map<ResourceReference, Long> resource2ValidSeqMap = new HashMap<>();
+        for (Map.Entry<String, Long> resourceEntry : 
partitionReplicatedResources.entrySet()) {
+            ResourceReference rr = 
ResourceReference.of(resourceEntry.getKey());
+            final PersistentLocalResourceRepository localResourceRepository =
+                    (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
+            LocalResource localResource = 
localResourceRepository.get(rr.getRelativePath().toString());
+            if (localResource != null) {
+                if (localResource.getId() != resourceEntry.getValue()) {
+                    LOGGER.info("replica has resource {} but with different 
resource id; ours {}, theirs {}", rr,
+                            localResource.getId(), resourceEntry.getValue());
+                } else {
+                    long resourceMasterValidSeq = owner ? 
getResourceMasterValidSeq(rr) : Integer.MAX_VALUE;
+                    resource2ValidSeqMap.put(rr, resourceMasterValidSeq);
+                }
+            }
+        }
+        return resource2ValidSeqMap;
+    }
 }
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
index 3209a98..6030245 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaSynchronizer.java
@@ -71,8 +71,10 @@ public class ReplicaSynchronizer {
 
     private void checkpointReplicaIndexes() throws IOException {
         final int partition = replica.getIdentifier().getPartition();
+        String masterNode =
+                appCtx.getReplicaManager().isPartitionOwner(partition) ? 
appCtx.getServiceContext().getNodeId() : null;
         CheckpointPartitionIndexesTask task =
-                new CheckpointPartitionIndexesTask(partition, 
getPartitionMaxComponentId(partition));
+                new CheckpointPartitionIndexesTask(partition, 
getPartitionMaxComponentId(partition), masterNode);
         ReplicationProtocol.sendTo(replica, task);
         ReplicationProtocol.waitForAck(replica);
     }
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index a73b71a..2494893 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -19,7 +19,7 @@
 package org.apache.asterix.transaction.management.resource;
 
 import static 
org.apache.asterix.common.storage.ResourceReference.getComponentSequence;
-import static 
org.apache.asterix.common.utils.StorageConstants.INDEX_CHECKPOINT_FILE_PREFIX;
+import static 
org.apache.asterix.common.utils.StorageConstants.INDEX_NON_DATA_FILES_PREFIX;
 import static 
org.apache.asterix.common.utils.StorageConstants.METADATA_FILE_NAME;
 import static org.apache.hyracks.api.exceptions.ErrorCode.CANNOT_CREATE_FILE;
 import static 
org.apache.hyracks.storage.am.lsm.common.impls.AbstractLSMIndexFileManager.COMPONENT_FILES_FILTER;
@@ -90,7 +90,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
     private static final String METADATA_FILE_MASK_NAME =
             StorageConstants.MASK_FILE_PREFIX + 
StorageConstants.METADATA_FILE_NAME;
     private static final FilenameFilter LSM_INDEX_FILES_FILTER =
-            (dir, name) -> !name.startsWith(INDEX_CHECKPOINT_FILE_PREFIX);
+            (dir, name) -> name.startsWith(METADATA_FILE_NAME) || 
!name.startsWith(INDEX_NON_DATA_FILES_PREFIX);
     private static final FilenameFilter MASK_FILES_FILTER =
             (dir, name) -> name.startsWith(StorageConstants.MASK_FILE_PREFIX);
     private static final int MAX_CACHED_RESOURCES = 1000;
@@ -200,7 +200,7 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
             byte[] bytes = 
OBJECT_MAPPER.writeValueAsBytes(resource.toJson(persistedResourceRegistry));
             FileUtil.writeAndForce(Paths.get(resourceFile.getAbsolutePath()), 
bytes);
             
indexCheckpointManagerProvider.get(DatasetResourceReference.of(resource)).init(UNINITIALIZED_COMPONENT_SEQ,
-                    0, 
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId());
+                    0, 
LSMComponentId.EMPTY_INDEX_LAST_COMPONENT_ID.getMaxId(), null);
             deleteResourceFileMask(resourceFile);
         } catch (Exception e) {
             cleanup(resourceFile);
@@ -400,6 +400,20 @@ public class PersistentLocalResourceRepository implements 
ILocalResourceReposito
         });
     }
 
+    public Map<String, Long> getPartitionReplicatedResources(int partition, 
IReplicationStrategy strategy)
+            throws HyracksDataException {
+        final Map<String, Long> partitionReplicatedResources = new HashMap<>();
+        final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                DatasetResourceReference drr = DatasetResourceReference.of(lr);
+                
partitionReplicatedResources.put(drr.getFileRelativePath().toString(), 
lr.getId());
+            }
+        }
+        return partitionReplicatedResources;
+    }
+
     public List<String> getPartitionReplicatedFiles(int partition, 
IReplicationStrategy strategy)
             throws HyracksDataException {
         final List<String> partitionReplicatedFiles = new ArrayList<>();

Reply via email to