Murtadha Hubail has submitted this change and it was merged. Change subject: [NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery ......................................................................
[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery - user model changes: no - storage format changes: no - interface changes: no Details: - Exclude non-replicated datasets files from delta recovery. - Fix used read buffer for large replication requests. Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99 Reviewed-on: https://asterix-gerrit.ics.uci.edu/2412 Sonar-Qube: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Integration-Tests: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Tested-by: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Contrib: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Reviewed-by: Michael Blow <mb...@apache.org> --- M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java M asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java M asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java 4 files changed, 40 insertions(+), 18 deletions(-) Approvals: Jenkins: Verified; No violations found; ; Verified Michael Blow: Looks good to me, approved diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java index 54d3a02..b2b1ad1 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java @@ -26,9 +26,10 @@ import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; -import org.apache.asterix.replication.api.IReplicationWorker; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.IReplicaTask; +import org.apache.asterix.replication.api.IReplicationWorker; import org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository; import org.apache.hyracks.api.exceptions.HyracksDataException; @@ -48,8 +49,10 @@ final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); localResourceRepository.cleanup(partition); - final List<String> partitionResources = localResourceRepository.getPartitionIndexesFiles(partition).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); + final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final List<String> partitionResources = + localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList()); final PartitionResourcesListResponse response = new PartitionResourcesListResponse(partition, partitionResources); ReplicationProtocol.sendTo(worker.getChannel(), response, worker.getReusableBuffer()); diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java index 280a2d4..41e7d9e 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java @@ -72,7 +72,7 @@ final ByteBuffer buf = ensureSize(dataBuffer, requestSize); // read request NetworkingUtil.readBytes(socketChannel, buf, requestSize); - return dataBuffer; + return buf; } public static ReplicationRequestType getRequestType(SocketChannel socketChannel, ByteBuffer byteBuffer) @@ -135,6 +135,7 @@ requestBuffer.put(outputStream.getByteArray(), 0, outputStream.getLength()); requestBuffer.flip(); NetworkingUtil.transferBufferToChannel(channel, requestBuffer); + channel.socket().getOutputStream().flush(); } catch (IOException e) { throw new ReplicationException(e); } @@ -148,9 +149,9 @@ public static IReplicationMessage readMessage(ReplicationRequestType type, SocketChannel socketChannel, ByteBuffer buffer) { try { - ReplicationProtocol.readRequest(socketChannel, buffer); + final ByteBuffer requestBuf = ReplicationProtocol.readRequest(socketChannel, buffer); final ByteArrayInputStream input = - new ByteArrayInputStream(buffer.array(), buffer.position(), buffer.limit()); + new ByteArrayInputStream(requestBuf.array(), requestBuf.position(), requestBuf.limit()); try (DataInputStream dis = new DataInputStream(input)) { switch (type) { case PARTITION_RESOURCES_REQUEST: diff --git a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java index 5658779..fae6ed6 100644 --- a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java +++ b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java @@ -27,6 +27,7 @@ import java.util.stream.Collectors; import org.apache.asterix.common.api.INcApplicationContext; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.utils.StoragePathUtil; import org.apache.asterix.replication.api.PartitionReplica; import org.apache.asterix.replication.messaging.PartitionResourcesListResponse; @@ -52,8 +53,10 @@ final Set<String> replicaFiles = getReplicaFiles(partition); final PersistentLocalResourceRepository localResourceRepository = (PersistentLocalResourceRepository) appCtx.getLocalResourceRepository(); - final Set<String> masterFiles = localResourceRepository.getPartitionIndexesFiles(partition).stream() - .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); + final IReplicationStrategy replicationStrategy = appCtx.getReplicationManager().getReplicationStrategy(); + final Set<String> masterFiles = + localResourceRepository.getPartitionReplicatedFiles(partition, replicationStrategy).stream() + .map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet()); // find files on master and not on replica final List<String> replicaMissingFiles = masterFiles.stream().filter(file -> !replicaFiles.contains(file)).collect(Collectors.toList()); diff --git a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java index ca22a84..7206382 100644 --- a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java +++ b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java @@ -52,6 +52,7 @@ import org.apache.asterix.common.dataflow.DatasetLocalResource; import org.apache.asterix.common.exceptions.AsterixException; import org.apache.asterix.common.replication.IReplicationManager; +import org.apache.asterix.common.replication.IReplicationStrategy; import org.apache.asterix.common.replication.ReplicationJob; import org.apache.asterix.common.storage.DatasetResourceReference; import org.apache.asterix.common.storage.IIndexCheckpointManager; @@ -342,18 +343,32 @@ }); } - public List<String> getPartitionIndexesFiles(int partition) throws HyracksDataException { - List<String> partitionFiles = new ArrayList<>(); - Set<File> partitionIndexes = getPartitionIndexes(partition); - for (File indexDir : partitionIndexes) { - if (indexDir.isDirectory()) { - File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); - if (indexFiles != null) { - Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add); - } + public List<String> getPartitionReplicatedFiles(int partition, IReplicationStrategy strategy) + throws HyracksDataException { + final List<String> partitionReplicatedFiles = new ArrayList<>(); + final Set<File> replicatedIndexes = new HashSet<>(); + final Map<Long, LocalResource> partitionResources = getPartitionResources(partition); + for (LocalResource lr : partitionResources.values()) { + DatasetLocalResource datasetLocalResource = (DatasetLocalResource) lr.getResource(); + if (strategy.isMatch(datasetLocalResource.getDatasetId())) { + replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile()); } } - return partitionFiles; + for (File indexDir : replicatedIndexes) { + partitionReplicatedFiles.addAll(getIndexFiles(indexDir)); + } + return partitionReplicatedFiles; + } + + private List<String> getIndexFiles(File indexDir) { + final List<String> indexFiles = new ArrayList<>(); + if (indexDir.isDirectory()) { + File[] indexFilteredFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER); + if (indexFilteredFiles != null) { + Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add); + } + } + return indexFiles; } private void createStorageRoots() { -- To view, visit https://asterix-gerrit.ics.uci.edu/2412 To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings Gerrit-MessageType: merged Gerrit-Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99 Gerrit-PatchSet: 2 Gerrit-Project: asterixdb Gerrit-Branch: master Gerrit-Owner: Murtadha Hubail <mhub...@apache.org> Gerrit-Reviewer: Jenkins <jenk...@fulliautomatix.ics.uci.edu> Gerrit-Reviewer: Michael Blow <mb...@apache.org> Gerrit-Reviewer: Murtadha Hubail <mhub...@apache.org>