>From Wail Alkowaileet <[email protected]>:

Wail Alkowaileet has uploaded this change for review. ( 
https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446 )


Change subject: [ASTERIXDB-3453][STO] Incompressible pages are not written as 
full pages
......................................................................

[ASTERIXDB-3453][STO] Incompressible pages are not written as full pages

- user model changes: no
- storage format changes: no
- interface changes: yes

Details:
Incompressible pages must be written as full pages (i.e., as
page + header) entirely to ensure the position of the cloud
files are in-sync with local files.

Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
---
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
M 
hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
M 
asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
5 files changed, 59 insertions(+), 3 deletions(-)



  git pull ssh://asterix-gerrit.ics.uci.edu:29418/asterixdb 
refs/changes/46/18446/1

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
index 91c24e8..208420d 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/AbstractCloudIOManager.java
@@ -268,10 +268,19 @@
         long writtenBytes = localIoManager.doSyncWrite(fHandle, offset, 
dataArray);
         dataArray[0].flip();
         dataArray[1].flip();
+        ensurePosition(fHandle, offset);
         cloudWrite(fHandle, dataArray);
         return writtenBytes;
     }

+    private void ensurePosition(IFileHandle fHandle, long offset) {
+        ICloudWriter cloudWriter = ((CloudFileHandle) 
fHandle).getCloudWriter();
+        if (cloudWriter.position() != offset) {
+            throw new IllegalStateException("Misaligned positions in " + 
fHandle.getFileReference() + ", cloudOffset: "
+                    + cloudWriter.position() + " != writeOffset: " + offset);
+        }
+    }
+
     @Override
     public final int doSyncWrite(IFileHandle fHandle, long offset, ByteBuffer 
data) throws HyracksDataException {
         int writtenBytes = localIoManager.doSyncWrite(fHandle, offset, data);
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
index cace898..403eb90 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/CloudResettableInputStream.java
@@ -36,10 +36,12 @@
     private final IWriteBufferProvider bufferProvider;
     private final ICloudBufferedWriter bufferedWriter;
     private ByteBuffer writeBuffer;
+    private long writtenBytes;

     public CloudResettableInputStream(ICloudBufferedWriter bufferedWriter, 
IWriteBufferProvider bufferProvider) {
         this.bufferedWriter = bufferedWriter;
         this.bufferProvider = bufferProvider;
+        writtenBytes = 0;
     }

     /* ************************************************************
@@ -84,6 +86,7 @@
             uploadAndWait();
         }
         writeBuffer.put((byte) b);
+        writtenBytes += 1;
     }

     @Override
@@ -102,7 +105,7 @@
             // enough to write all
             if (writeBuffer.remaining() > pageRemaining) {
                 writeBuffer.put(b, offset, pageRemaining);
-                return len;
+                break;
             }

             int remaining = writeBuffer.remaining();
@@ -112,10 +115,16 @@
             uploadAndWait();
         }

+        writtenBytes += len;
         return len;
     }

     @Override
+    public long position() {
+        return writtenBytes;
+    }
+
+    @Override
     public int read(byte[] b, int off, int len) throws IOException {
         if (writeBuffer.remaining() == 0) {
             return -1;
@@ -173,6 +182,7 @@
         if (writeBuffer == null) {
             writeBuffer = bufferProvider.getBuffer();
             writeBuffer.clear();
+            writtenBytes = 0;
         }
     }

diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
index 15822c4..920be9c 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/ICloudWriter.java
@@ -61,6 +61,11 @@
     int write(byte[] b, int off, int len) throws HyracksDataException;

     /**
+     * @return the current position of the writer
+     */
+    long position();
+
+    /**
      * Finish the write operation
      * Note: this should be called upon successful write
      */
diff --git 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
index a6dade5..d9119a5 100644
--- 
a/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
+++ 
b/asterixdb/asterix-cloud/src/main/java/org/apache/asterix/cloud/clients/google/gcs/GCSWriter.java
@@ -41,12 +41,14 @@
     private final IRequestProfiler profiler;
     private final Storage gcsClient;
     private WriteChannel writer = null;
+    private long writtenBytes;

     public GCSWriter(String bucket, String path, Storage gcsClient, 
IRequestProfiler profiler) {
         this.bucket = bucket;
         this.path = path;
         this.profiler = profiler;
         this.gcsClient = gcsClient;
+        writtenBytes = 0;
     }

     @Override
@@ -67,17 +69,26 @@
             throw HyracksDataException.create(e);
         }

+        writtenBytes += written;
         return written;
     }

     @Override
     public int write(byte[] b, int off, int len) throws HyracksDataException {
-        return write(ByteBuffer.wrap(b, off, len));
+        int written = write(ByteBuffer.wrap(b, off, len));
+        writtenBytes += written;
+        return written;
+    }
+
+    @Override
+    public long position() {
+        return writtenBytes;
     }

     @Override
     public void write(int b) throws HyracksDataException {
         write(ByteBuffer.wrap(new byte[] { (byte) b }));
+        writtenBytes += 1;
     }

     @Override
@@ -105,6 +116,7 @@
         if (writer == null) {
             writer = gcsClient.writer(BlobInfo.newBuilder(BlobId.of(bucket, 
path)).build());
             writer.setChunkSize(WRITE_BUFFER_SIZE);
+            writtenBytes = 0;
             log("STARTED");
         }
     }
diff --git 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
index 6ad4d27..e1a46e7 100644
--- 
a/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
+++ 
b/hyracks-fullstack/hyracks/hyracks-storage-common/src/main/java/org/apache/hyracks/storage/common/file/CompressedBufferedFileHandle.java
@@ -123,8 +123,10 @@
                 expectedBytesWritten = cBuffer.limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
cBuffer);
             } else {
-                //Compression did not gain any savings
+                // Compression did not gain any savings
                 final ByteBuffer[] buffers = header.prepareWrite(cPage);
+                // Incompressible pages should be written entirely
+                fixBufferPointers(buffers[1], 0);
                 offset = compressedFileManager.writePageInfo(pageId, 
bufferCache.getPageSizeWithHeader());
                 expectedBytesWritten = buffers[0].limit() + (long) 
buffers[1].limit();
                 bytesWritten = context.write(ioManager, handle, offset, 
buffers);

--
To view, visit https://asterix-gerrit.ics.uci.edu/c/asterixdb/+/18446
To unsubscribe, or for help writing mail filters, visit 
https://asterix-gerrit.ics.uci.edu/settings

Gerrit-Project: asterixdb
Gerrit-Branch: master
Gerrit-Change-Id: Iccebe6fcab375d064825ab2e9343b96daf8afbc6
Gerrit-Change-Number: 18446
Gerrit-PatchSet: 1
Gerrit-Owner: Wail Alkowaileet <[email protected]>
Gerrit-MessageType: newchange

Reply via email to