This is an automated email from the ASF dual-hosted git repository.

baunsgaard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/systemds.git


The following commit(s) were added to refs/heads/main by this push:
     new 08ce6bc1f5 [MINOR] Double Buffering longer than buffer arrays
08ce6bc1f5 is described below

commit 08ce6bc1f5da755b7c0d1bb6dce347ba28711263
Author: Sebastian Baunsgaard <baunsga...@apache.org>
AuthorDate: Tue Apr 16 10:51:51 2024 +0200

    [MINOR] Double Buffering longer than buffer arrays
    
    This commit fixes the double buffering of byte arrays
    to handle cases where the byte arrays given are larger than the sizes
    of the buffer.
    Previous to this commit these arrays made the buffer crash, while
    this commit fixes it to forward the buffers.
    Also contained is a bit of documentation in the FastBufferedDataOutput.
    
    Closes 2019
---
 .../runtime/util/DoubleBufferingOutputStream.java  | 67 ++++++++++------------
 .../runtime/util/FastBufferedDataOutputStream.java | 32 +++++++----
 .../apache/sysds/runtime/util/LocalFileUtils.java  | 21 +++----
 3 files changed, 62 insertions(+), 58 deletions(-)

diff --git 
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java 
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
index 16504e64ee..8d3dd7e994 100644
--- 
a/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
+++ 
b/src/main/java/org/apache/sysds/runtime/util/DoubleBufferingOutputStream.java
@@ -16,13 +16,12 @@
  * specific language governing permissions and limitations
  * under the License.
  */
- 
+
 package org.apache.sysds.runtime.util;
 
 import java.io.FilterOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
-import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -34,7 +33,7 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream {
        protected Future<?>[] _locks;
        protected byte[][] _buff;
        private int _pos;
-       
+
        public DoubleBufferingOutputStream(OutputStream out) {
                this(out, 2, 8192);
        }
@@ -43,42 +42,52 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream {
                super(out);
                if(size <= 0)
                        throw new IllegalArgumentException("Buffer size <= 0.");
-               if( size%8 != 0 )
+               if(size % 8 != 0)
                        throw new IllegalArgumentException("Buffer size not a 
multiple of 8.");
                _buff = new byte[num][size];
                _locks = new Future<?>[num];
-               for(int i=0; i<num; i++)
+               for(int i = 0; i < num; i++) // fill the futures to avoid null 
pointers.
                        _locks[i] = ConcurrentUtils.constantFuture(null);
        }
 
        @Override
        public void write(int b) throws IOException {
-               throw new IOException("Not supported"); 
+               throw new IOException("Not supported");
        }
 
        @Override
-       public void write(byte[] b, int off, int len) 
-               throws IOException 
-       {
+       public void write(byte[] b, int off, int len) throws IOException {
                try {
                        synchronized(_buff) {
-                               //block until buffer is free to use
+                               final byte[] b_pos = _buff[_pos];
+                               // block until buffer is free to use
                                _locks[_pos].get();
-                               
-                               //copy for asynchronous write because b is 
reused higher up
-                               System.arraycopy(b, off, _buff[_pos], 0, len);
-                               
-                               //submit write request 
-                               _locks[_pos] = _pool.submit(new 
WriteTask(_buff[_pos], len));
-                               _pos = (_pos+1) % _buff.length;
+                               if(b_pos.length >= len) {
+                                       // copy the block into the buffer.
+                                       System.arraycopy(b, off, b_pos, 0, len);
+                                       // submit write request guaranteed to 
be sequential since it is using a single thread.
+                                       _locks[_pos] = _pool.submit(() -> 
writeBuffer(b_pos, 0, len));
+                                       // copy for asynchronous write because 
b is reused higher up
+                               }
+                               else {
+                                       // The given byte array is longer than 
the buffer.
+                                       // This means that the async buffer 
would overflow and therefore not work.
+                                       // To avoid this we simply write the 
given byte array without a buffer.
+                                       // This approach only works if the 
caller adhere to not modify the byte array given
+                                       _locks[_pos] = _pool.submit(() -> 
writeBuffer(b, off, len));
+                                       // get the task to reduce the risk ( 
and at least block the current thread) 
+                                       // to avoid race conditions from 
callers.
+                                       _locks[_pos].get(); 
+                               }
+                               _pos = (_pos + 1) % _buff.length;
                        }
                }
                catch(Exception ex) {
                        throw new IOException(ex);
                }
        }
-       
-       public void writeBuffer(byte[] b, int off, int len) {
+
+       private void writeBuffer(byte[] b, int off, int len) {
                try {
                        out.write(b, off, len);
                }
@@ -91,14 +100,14 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream {
        public void flush() throws IOException {
                try {
                        synchronized(_buff) {
-                               for(int i=0; i<_buff.length; i++)
+                               for(int i = 0; i < _buff.length; i++)
                                        _locks[i].get();
                        }
+                       out.flush();
                }
                catch(Exception ex) {
                        throw new IOException(ex);
                }
-               out.flush();
        }
 
        @Override
@@ -106,20 +115,4 @@ public class DoubleBufferingOutputStream extends 
FilterOutputStream {
                _pool.shutdown();
                super.close();
        }
-       
-       private class WriteTask implements Callable<Object> {
-               private final byte[] _b;
-               private final int _len;
-               
-               protected WriteTask(byte[] buff, int len) {
-                       _b = buff;
-                       _len = len;
-               }
-               
-               @Override
-               public Object call() {
-                       writeBuffer(_b, 0, _len);
-                       return null;
-               }
-       }
 }
diff --git 
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java 
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
index 1804bc78e0..da9d380aa7 100644
--- 
a/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
+++ 
b/src/main/java/org/apache/sysds/runtime/util/FastBufferedDataOutputStream.java
@@ -42,8 +42,11 @@ import 
org.apache.sysds.runtime.matrix.data.MatrixBlockDataOutput;
  */
 public class FastBufferedDataOutputStream extends FilterOutputStream 
implements DataOutput, MatrixBlockDataOutput
 {
+       /** The buffer to copy bytes into before writing out */
        protected byte[] _buff;
+       /** The maximum size of the buffer */
        protected int _bufflen;
+       /** The current fill amount of the buffer */
        protected int _count;
 
        public FastBufferedDataOutputStream(OutputStream out) {
@@ -54,7 +57,7 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
                super(out);
                if(size <= 0)
                        throw new IllegalArgumentException("Buffer size <= 0.");
-               if( size%8 != 0 )
+               if(size % 8 != 0)
                        throw new IllegalArgumentException("Buffer size not a 
multiple of 8.");
                _buff = new byte[size];
                _bufflen = size;
@@ -68,19 +71,21 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
        }
 
        @Override
-       public void write(byte[] b, int off, int len) 
-               throws IOException 
-       {
-               if (len >= _bufflen) {
-                       flushBuffer();
+       public void write(byte[] b, int off, int len) throws IOException {
+               if(len > _bufflen) {
+                       // If we write a byte array that is larger than the 
buffer
+                       flushBuffer(); // flush the buffer first and
+                       // forward the array directly
                        out.write(b, off, len);
-                       return;
                }
-               if (len > _bufflen - _count) {
-                       flushBuffer();
+               else{
+                       if (len > _bufflen - _count) 
+                               // if the write is larger than what is left in 
the buffer.
+                               flushBuffer();
+                       
+                       System.arraycopy(b, off, _buff, _count, len);
+                       _count += len;
                }
-               System.arraycopy(b, off, _buff, _count, len);
-               _count += len;
        }
 
        @Override
@@ -89,6 +94,11 @@ public class FastBufferedDataOutputStream extends 
FilterOutputStream implements
                out.flush();
        }
 
+       /**
+        * Flush the buffer to empty the current content and reset the counting 
pointer to 0
+        * 
+        * @throws IOException Throws an IOException on errors.
+        */
        private void flushBuffer() throws IOException {
                if(_count > 0) {
                        out.write(_buff, 0, _count);
diff --git a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java 
b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
index 5ab5a76e22..2b4f56f3e9 100644
--- a/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
+++ b/src/main/java/org/apache/sysds/runtime/util/LocalFileUtils.java
@@ -233,23 +233,24 @@ public class LocalFileUtils
         * @param doubleBuffering overlay serialization and I/O
         * @throws IOException if IOException occurs
         */
-       public static void writeWritableToLocal(String fname, Writable mb, 
boolean doubleBuffering)
-               throws IOException
-       {
-               OutputStream fos = new FileOutputStream( fname );
-               if( doubleBuffering )
-                       fos = new DoubleBufferingOutputStream(fos, 2, 
BUFFER_SIZE);
-               FastBufferedDataOutputStream out = new 
FastBufferedDataOutputStream(fos, BUFFER_SIZE);
-               
+       public static void writeWritableToLocal(String fname, Writable mb, 
boolean doubleBuffering) throws IOException {
+               final FastBufferedDataOutputStream out = //
+                       new FastBufferedDataOutputStream(getOut(fname, 
doubleBuffering), BUFFER_SIZE);
                try {
                        mb.write(out);
                }
                finally {
-                       IOUtilFunctions.closeSilently(out); //incl double 
buffering
-                       IOUtilFunctions.closeSilently(fos);
+                       IOUtilFunctions.closeSilently(out);
                }
        }
 
+       private static OutputStream getOut(String fname, boolean 
doubleBuffering) throws IOException{
+               if(doubleBuffering)
+                       return new DoubleBufferingOutputStream(new 
FileOutputStream(fname), 2, BUFFER_SIZE);
+               else
+                       return new FileOutputStream(fname);
+       }
+
        public static void writeByteArrayToLocal( String fname, byte[] data )
                throws IOException
        {       

Reply via email to