>From Wail Alkowaileet <[email protected]>:
Wail Alkowaileet has uploaded this change for review. (
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 )
Change subject: [ASTERIXDB-3453][STO] Incompressible pages are not written as
full pages
......................................................................
[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages
- user model changes: no
- storage format changes: no
- interface changes: yes
Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.
Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
---
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
M
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
5 files changed, 59 insertions(+), 3 deletions(-)
git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb
refs/changes/46/18446/1
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e8..208420d 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -268,10 +268,19 @@
long writtenBytes = localIoManager.doSyncWrite(fHandle, offset,
dataArray);
dataArray[0].flip();
dataArray[1].flip();
+ ensurePosition(fHandle, offset);
cloudWrite(fHandle, dataArray);
return writtenBytes;
}
+ private void ensurePosition(IFileHandle fHandle, long offset) {
+ ICloudWriter cloudWriter = ((CloudFileHandle)
fHandle).getCloudWriter();
+ if (cloudWriter.position() != offset) {
+ throw new IllegalStateException("Misaligned positions in " +
fHandle.getFileReference() + ", cloudOffset: "
+ + cloudWriter.position() + " != writeOffset: " + offset);
+ }
+ }
+
@Override
public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer
data) throws HyracksDataException {
int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898..403eb90 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@
private final IWriteBufferProvider bufferProvider;
private final ICloudBufferedWriter bufferedWriter;
private ByteBuffer writeBuffer;
+ private long writtenBytes;
public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter,
IWriteBufferProvider bufferProvider) {
this.bufferedWriter = bufferedWriter;
this.bufferProvider = bufferProvider;
+ writtenBytes = 0;
}
/* ************************************************************
@@ -84,6 +86,7 @@
uploadAndWait();
}
writeBuffer.put((byte) b);
+ writtenBytes += 1;
}
@Override
@@ -102,7 +105,7 @@
// enough to write all
if (writeBuffer.remaining() > pageRemaining) {
writeBuffer.put(b, offset, pageRemaining);
- return len;
+ break;
}
int remaining = writeBuffer.remaining();
@@ -112,10 +115,16 @@
uploadAndWait();
}
+ writtenBytes += len;
return len;
}
@Override
+ public long position() {
+ return writtenBytes;
+ }
+
+ @Override
public int read(byte[] b, int off, int len) throws IOException {
if (writeBuffer.remaining() == 0) {
return -1;
@@ -173,6 +182,7 @@
if (writeBuffer == null) {
writeBuffer = bufferProvider.getBuffer();
writeBuffer.clear();
+ writtenBytes = 0;
}
}
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4..920be9c 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -61,6 +61,11 @@
int write(byte[] b, int off, int len) throws HyracksDataException;
/**
+ * @return the current position of the writer
+ */
+ long position();
+
+ /**
* Finish the write operation
* Note: this should be called upon successful write
*/
diff --git
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5..d9119a5 100644
---
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@
private final IRequestProfiler profiler;
private final Storage gcsClient;
private WriteChannel writer = null;
+ private long writtenBytes;
public GCSWriter(String bucket, String path, Storage gcsClient,
IRequestProfiler profiler) {
this.bucket = bucket;
this.path = path;
this.profiler = profiler;
this.gcsClient = gcsClient;
+ writtenBytes = 0;
}
@Override
@@ -67,17 +69,26 @@
throw HyracksDataException.create(e);
}
+ writtenBytes += written;
return written;
}
@Override
public int write(byte[] b, int off, int len) throws HyracksDataException {
- return write(ByteBuffer.wrap(b, off, len));
+ int written = write(ByteBuffer.wrap(b, off, len));
+ writtenBytes += written;
+ return written;
+ }
+
+ @Override
+ public long position() {
+ return writtenBytes;
}
@Override
public void write(int b) throws HyracksDataException {
write(ByteBuffer.wrap(new byte[] { (byte) b }));
+ writtenBytes += 1;
}
@Override
@@ -105,6 +116,7 @@
if (writer == null) {
writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket,
path)).build());
writer.setChunkSize(WRITE_BUFFER_SIZE);
+ writtenBytes = 0;
log("STARTED");
}
}
diff --git
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27..e1a46e7 100644
---
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@
expectedBytesWritten = cBuffer.limit();
bytesWritten = context.write(ioManager, handle, offset,
cBuffer);
} else {
- //Compression did not gain any savings
+ // Compression did not gain any savings
final ByteBuffer[] buffers = header.prepareWrite(cPage);
+ // Incompressible pages should be written entirely
+ fixBufferPointers(buffers[1], 0);
offset = compressedFileManager.writePageInfo(pageId,
bufferCache.getPageSizeWithHeader());
expectedBytesWritten = buffers[0].limit() + (long)
buffers[1].limit();
bytesWritten = context.write(ioManager, handle, offset,
buffers);
--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
To unsubscribe, or for help writing mail filters, visit
https://asterix-gerrit.ics.uci.edu/settings
Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Gerrit-Change-Number: 18446
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange