Murtadha Hubail has submitted this change and it was merged.

Change subject: [NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta 
Recovery
......................................................................


[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery

- user model changes: no
- storage format changes: no
- interface changes: no

Details:
- Exclude non-replicated datasets files from
  delta recovery.
- Fix used read buffer for large replication
  requests.

Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412
Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Reviewed-by: Michael Blow <mb...@apache.org>
---
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
M 
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M 
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
4 files changed, 40 insertions(+), 18 deletions(-)

Approvals:
  Jenkins: Verified; No violations found; ; Verified
  Michael Blow: Looks good to me, approved



diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 54d3a02..b2b1ad1 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,9 +26,10 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
 import 
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 
@@ -48,8 +49,10 @@
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
         localResourceRepository.cleanup(partition);
-        final List<String> partitionResources = 
localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+        final IReplicationStrategy replicationStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
+        final List<String> partitionResources =
+                localResourceRepository.getPartitionReplicatedFiles(partition, 
replicationStrategy).stream()
+                        
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
         final PartitionResourcesListResponse response =
                 new PartitionResourcesListResponse(partition, 
partitionResources);
         ReplicationProtocol.sendTo(worker.getChannel(), response, 
worker.getReusableBuffer());
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 280a2d4..41e7d9e 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -72,7 +72,7 @@
         final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
         // read request
         NetworkingUtil.readBytes(socketChannel, buf, requestSize);
-        return dataBuffer;
+        return buf;
     }
 
     public static ReplicationRequestType getRequestType(SocketChannel 
socketChannel, ByteBuffer byteBuffer)
@@ -135,6 +135,7 @@
             requestBuffer.put(outputStream.getByteArray(), 0, 
outputStream.getLength());
             requestBuffer.flip();
             NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+            channel.socket().getOutputStream().flush();
         } catch (IOException e) {
             throw new ReplicationException(e);
         }
@@ -148,9 +149,9 @@
     public static IReplicationMessage readMessage(ReplicationRequestType type, 
SocketChannel socketChannel,
             ByteBuffer buffer) {
         try {
-            ReplicationProtocol.readRequest(socketChannel, buffer);
+            final ByteBuffer requestBuf = 
ReplicationProtocol.readRequest(socketChannel, buffer);
             final ByteArrayInputStream input =
-                    new ByteArrayInputStream(buffer.array(), 
buffer.position(), buffer.limit());
+                    new ByteArrayInputStream(requestBuf.array(), 
requestBuf.position(), requestBuf.limit());
             try (DataInputStream dis = new DataInputStream(input)) {
                 switch (type) {
                     case PARTITION_RESOURCES_REQUEST:
diff --git 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 5658779..fae6ed6 100644
--- 
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++ 
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -27,6 +27,7 @@
 import java.util.stream.Collectors;
 
 import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.utils.StoragePathUtil;
 import org.apache.asterix.replication.api.PartitionReplica;
 import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -52,8 +53,10 @@
         final Set<String> replicaFiles = getReplicaFiles(partition);
         final PersistentLocalResourceRepository localResourceRepository =
                 (PersistentLocalResourceRepository) 
appCtx.getLocalResourceRepository();
-        final Set<String> masterFiles = 
localResourceRepository.getPartitionIndexesFiles(partition).stream()
-                
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+        final IReplicationStrategy replicationStrategy = 
appCtx.getReplicationManager().getReplicationStrategy();
+        final Set<String> masterFiles =
+                localResourceRepository.getPartitionReplicatedFiles(partition, 
replicationStrategy).stream()
+                        
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
         // find files on master and not on replica
         final List<String> replicaMissingFiles =
                 masterFiles.stream().filter(file -> 
!replicaFiles.contains(file)).collect(Collectors.toList());
diff --git 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ca22a84..7206382 100644
--- 
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++ 
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -52,6 +52,7 @@
 import org.apache.asterix.common.dataflow.DatasetLocalResource;
 import org.apache.asterix.common.exceptions.AsterixException;
 import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
 import org.apache.asterix.common.replication.ReplicationJob;
 import org.apache.asterix.common.storage.DatasetResourceReference;
 import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -342,18 +343,32 @@
         });
     }
 
-    public List<String> getPartitionIndexesFiles(int partition) throws 
HyracksDataException {
-        List<String> partitionFiles = new ArrayList<>();
-        Set<File> partitionIndexes = getPartitionIndexes(partition);
-        for (File indexDir : partitionIndexes) {
-            if (indexDir.isDirectory()) {
-                File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
-                if (indexFiles != null) {
-                    
Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
-                }
+    public List<String> getPartitionReplicatedFiles(int partition, 
IReplicationStrategy strategy)
+            throws HyracksDataException {
+        final List<String> partitionReplicatedFiles = new ArrayList<>();
+        final Set<File> replicatedIndexes = new HashSet<>();
+        final Map<Long, LocalResource> partitionResources = 
getPartitionResources(partition);
+        for (LocalResource lr : partitionResources.values()) {
+            DatasetLocalResource datasetLocalResource = (DatasetLocalResource) 
lr.getResource();
+            if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+                
replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
             }
         }
-        return partitionFiles;
+        for (File indexDir : replicatedIndexes) {
+            partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+        }
+        return partitionReplicatedFiles;
+    }
+
+    private List<String> getIndexFiles(File indexDir) {
+        final List<String> indexFiles = new ArrayList<>();
+        if (indexDir.isDirectory()) {
+            File[] indexFilteredFiles = 
indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+            if (indexFilteredFiles != null) {
+                
Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
+            }
+        }
+        return indexFiles;
     }
 
     private void createStorageRoots() {

-- 
To view, visit https://asterix-gerrit.ics.uci.edu/2412
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings

Gerrit-MessageType: merged
Gerrit-Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Gerrit-PatchSet: 2
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <mhub...@apache.org>
Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu>
Gerrit-Reviewer: Michael Blow <mb...@apache.org>
Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>

Reply via email to