Murtadha Hubail has uploaded a new change for review.
https://asterix-gerrit.ics.uci.edu/2412
Change subject: [NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta
Recovery
......................................................................
[NO ISSUE][REPL] Exclude Non-Replicated Datasets From Delta Recovery
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
- Exclude non-replicated datasets files from
delta recovery.
- Fix used read buffer for large replication
requests.
Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
---
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
M
asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
M
asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
4 files changed, 40 insertions(+), 18 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/12/2412/1
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
index 54d3a02..b2b1ad1 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/PartitionResourcesListTask.java
@@ -26,9 +26,10 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
-import org.apache.asterix.replication.api.IReplicationWorker;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.IReplicaTask;
+import org.apache.asterix.replication.api.IReplicationWorker;
import
org.apache.asterix.transaction.management.resource.PersistentLocalResourceRepository;
import org.apache.hyracks.api.exceptions.HyracksDataException;
@@ -48,8 +49,10 @@
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository();
localResourceRepository.cleanup(partition);
- final List<String> partitionResources =
localResourceRepository.getPartitionIndexesFiles(partition).stream()
-
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
+ final IReplicationStrategy replicationStrategy =
appCtx.getReplicationManager().getReplicationStrategy();
+ final List<String> partitionResources =
+ localResourceRepository.getPartitionReplicatedFiles(partition,
replicationStrategy).stream()
+
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toList());
final PartitionResourcesListResponse response =
new PartitionResourcesListResponse(partition,
partitionResources);
ReplicationProtocol.sendTo(worker.getChannel(), response,
worker.getReusableBuffer());
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
index 280a2d4..41e7d9e 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/messaging/ReplicationProtocol.java
@@ -72,7 +72,7 @@
final ByteBuffer buf = ensureSize(dataBuffer, requestSize);
// read request
NetworkingUtil.readBytes(socketChannel, buf, requestSize);
- return dataBuffer;
+ return buf;
}
public static ReplicationRequestType getRequestType(SocketChannel
socketChannel, ByteBuffer byteBuffer)
@@ -135,6 +135,7 @@
requestBuffer.put(outputStream.getByteArray(), 0,
outputStream.getLength());
requestBuffer.flip();
NetworkingUtil.transferBufferToChannel(channel, requestBuffer);
+ channel.socket().getOutputStream().flush();
} catch (IOException e) {
throw new ReplicationException(e);
}
@@ -148,9 +149,9 @@
public static IReplicationMessage readMessage(ReplicationRequestType type,
SocketChannel socketChannel,
ByteBuffer buffer) {
try {
- ReplicationProtocol.readRequest(socketChannel, buffer);
+ final ByteBuffer requestBuf =
ReplicationProtocol.readRequest(socketChannel, buffer);
final ByteArrayInputStream input =
- new ByteArrayInputStream(buffer.array(),
buffer.position(), buffer.limit());
+ new ByteArrayInputStream(requestBuf.array(),
requestBuf.position(), requestBuf.limit());
try (DataInputStream dis = new DataInputStream(input)) {
switch (type) {
case PARTITION_RESOURCES_REQUEST:
diff --git
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
index 5658779..fae6ed6 100644
---
a/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
+++
b/asterixdb/asterix-replication/src/main/java/org/apache/asterix/replication/sync/ReplicaFilesSynchronizer.java
@@ -27,6 +27,7 @@
import java.util.stream.Collectors;
import org.apache.asterix.common.api.INcApplicationContext;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.utils.StoragePathUtil;
import org.apache.asterix.replication.api.PartitionReplica;
import org.apache.asterix.replication.messaging.PartitionResourcesListResponse;
@@ -52,8 +53,10 @@
final Set<String> replicaFiles = getReplicaFiles(partition);
final PersistentLocalResourceRepository localResourceRepository =
(PersistentLocalResourceRepository)
appCtx.getLocalResourceRepository();
- final Set<String> masterFiles =
localResourceRepository.getPartitionIndexesFiles(partition).stream()
-
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
+ final IReplicationStrategy replicationStrategy =
appCtx.getReplicationManager().getReplicationStrategy();
+ final Set<String> masterFiles =
+ localResourceRepository.getPartitionReplicatedFiles(partition,
replicationStrategy).stream()
+
.map(StoragePathUtil::getFileRelativePath).collect(Collectors.toSet());
// find files on master and not on replica
final List<String> replicaMissingFiles =
masterFiles.stream().filter(file ->
!replicaFiles.contains(file)).collect(Collectors.toList());
diff --git
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
index ca22a84..7206382 100644
---
a/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
+++
b/asterixdb/asterix-transactions/src/main/java/org/apache/asterix/transaction/management/resource/PersistentLocalResourceRepository.java
@@ -52,6 +52,7 @@
import org.apache.asterix.common.dataflow.DatasetLocalResource;
import org.apache.asterix.common.exceptions.AsterixException;
import org.apache.asterix.common.replication.IReplicationManager;
+import org.apache.asterix.common.replication.IReplicationStrategy;
import org.apache.asterix.common.replication.ReplicationJob;
import org.apache.asterix.common.storage.DatasetResourceReference;
import org.apache.asterix.common.storage.IIndexCheckpointManager;
@@ -342,18 +343,32 @@
});
}
- public List<String> getPartitionIndexesFiles(int partition) throws
HyracksDataException {
- List<String> partitionFiles = new ArrayList<>();
- Set<File> partitionIndexes = getPartitionIndexes(partition);
- for (File indexDir : partitionIndexes) {
- if (indexDir.isDirectory()) {
- File[] indexFiles = indexDir.listFiles(LSM_INDEX_FILES_FILTER);
- if (indexFiles != null) {
-
Stream.of(indexFiles).map(File::getAbsolutePath).forEach(partitionFiles::add);
- }
+ public List<String> getPartitionReplicatedFiles(int partition,
IReplicationStrategy strategy)
+ throws HyracksDataException {
+ final List<String> partitionReplicatedFiles = new ArrayList<>();
+ final Set<File> replicatedIndexes = new HashSet<>();
+ final Map<Long, LocalResource> partitionResources =
getPartitionResources(partition);
+ for (LocalResource lr : partitionResources.values()) {
+ DatasetLocalResource datasetLocalResource = (DatasetLocalResource)
lr.getResource();
+ if (strategy.isMatch(datasetLocalResource.getDatasetId())) {
+
replicatedIndexes.add(ioManager.resolve(lr.getPath()).getFile());
}
}
- return partitionFiles;
+ for (File indexDir : replicatedIndexes) {
+ partitionReplicatedFiles.addAll(getIndexFiles(indexDir));
+ }
+ return partitionReplicatedFiles;
+ }
+
+ private List<String> getIndexFiles(File indexDir) {
+ final List<String> indexFiles = new ArrayList<>();
+ if (indexDir.isDirectory()) {
+ File[] indexFilteredFiles =
indexDir.listFiles(LSM_INDEX_FILES_FILTER);
+ if (indexFilteredFiles != null) {
+
Stream.of(indexFilteredFiles).map(File::getAbsolutePath).forEach(indexFiles::add);
+ }
+ }
+ return indexFiles;
}
private void createStorageRoots() {
--
To view, visit https://asterix-gerrit.ics.uci.edu/2412
To unsubscribe, visit https://asterix-gerrit.ics.uci.edu/settings
Gerrit-MessageType: newchange
Gerrit-Change-Id: Ic734af7becf26082e79fae52bd2c01ba567c1c99
Gerrit-PatchSet: 1
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Owner: Murtadha Hubail <[email protected]>