Author: adulceanu
Date: Thu Sep 14 07:46:52 2017
New Revision: 1808319
URL: http://svn.apache.org/viewvc?rev=1808319&view=rev
Log:
OAK-6661 - ResponseDecoder should check that the length of the received blob
matches the length of the sent blob
Added length check and hard failure when attempting to remove an open spool file
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java?rev=1808319&r1=1808318&r2=1808319&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ChunkedBlobStream.java
Thu Sep 14 07:46:52 2017
@@ -142,14 +142,15 @@ public class ChunkedBlobStream implement
byte mask = createMask(data.length);
Hasher hasher = Hashing.murmur3_32().newHasher();
- long hash = hasher.putByte(mask).putBytes(data).hash().padToLong();
+ long hash =
hasher.putByte(mask).putLong(length).putBytes(data).hash().padToLong();
byte[] blobIdBytes = blobId.getBytes();
ByteBuf out = allocator.buffer();
- out.writeInt(1 + 1 + 4 + blobIdBytes.length + 8 + data.length);
+ out.writeInt(1 + 1 + 8 + 4 + blobIdBytes.length + 8 + data.length);
out.writeByte(Messages.HEADER_BLOB);
out.writeByte(mask);
+ out.writeLong(length);
out.writeInt(blobIdBytes.length);
out.writeBytes(blobIdBytes);
out.writeLong(hash);
Modified:
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
URL:
http://svn.apache.org/viewvc/jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java?rev=1808319&r1=1808318&r2=1808319&view=diff
==============================================================================
---
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
(original)
+++
jackrabbit/oak/trunk/oak-segment-tar/src/main/java/org/apache/jackrabbit/oak/segment/standby/codec/ResponseDecoder.java
Thu Sep 14 07:46:52 2017
@@ -26,6 +26,7 @@ import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
+import java.nio.file.Files;
import java.util.List;
import java.util.UUID;
@@ -123,7 +124,8 @@ public class ResponseDecoder extends Byt
private static void decodeGetBlobResponse(int length, ByteBuf in,
List<Object> out) throws IOException {
byte mask = in.readByte();
-
+ long blobLength = in.readLong();
+
int blobIdLength = in.readInt();
byte[] blobIdBytes = new byte[blobIdLength];
in.readBytes(blobIdBytes);
@@ -134,7 +136,7 @@ public class ResponseDecoder extends Byt
if ((mask & (1 << 0)) != 0) {
if (tempFile.exists()) {
log.debug("Detected previous incomplete transfer for {}.
Cleaning up...", blobId);
- tempFile.delete();
+ Files.delete(tempFile.toPath());
}
}
@@ -144,7 +146,7 @@ public class ResponseDecoder extends Byt
byte[] chunkData = new byte[in.readableBytes()];
in.readBytes(chunkData);
- if (hash(mask, chunkData) != hash) {
+ if (hash(mask, blobLength, chunkData) != hash) {
log.debug("Invalid checksum, discarding current chunk from {}",
blobId);
return;
} else {
@@ -158,8 +160,12 @@ public class ResponseDecoder extends Byt
if ((mask & (1 << 1)) != 0) {
log.debug("Received entire blob {}", blobId);
- FileInputStream fis = new DeleteOnCloseFileInputStream(tempFile);
- out.add(new GetBlobResponse(null, blobId, fis,
fis.getChannel().size()));
+ if (blobLength == tempFile.length()) {
+ FileInputStream fis = new
DeleteOnCloseFileInputStream(tempFile);
+ out.add(new GetBlobResponse(null, blobId, fis,
fis.getChannel().size()));
+ } else {
+ log.debug("Size mismatch for blob {}", blobId);
+ }
}
}
@@ -194,8 +200,8 @@ public class ResponseDecoder extends Byt
return
Hashing.murmur3_32().newHasher().putBytes(data).hash().padToLong();
}
- private static long hash(byte mask, byte[] data) {
- return
Hashing.murmur3_32().newHasher().putByte(mask).putBytes(data).hash().padToLong();
+ private static long hash(byte mask, long blobLength, byte[] data) {
+ return
Hashing.murmur3_32().newHasher().putByte(mask).putLong(blobLength).putBytes(data).hash().padToLong();
}
}