SammyVimes commented on code in PR #2326:
URL: https://github.com/apache/ignite-3/pull/2326#discussion_r1270595756


##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -129,6 +137,18 @@ public class DirectByteBufferStreamImplV1 implements 
DirectByteBufferStream {
 
     private Iterator<?> it;
 
+    private BufferedInputStream fileReader;
+
+    private int fileSate = 0;

Review Comment:
   ```suggestion
       private int fileState = 0;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -129,6 +137,18 @@ public class DirectByteBufferStreamImplV1 implements 
DirectByteBufferStream {
 
     private Iterator<?> it;
 
+    private BufferedInputStream fileReader;
+
+    private int fileSate = 0;
+
+    private Path filePath;
+
+    private BufferedOutputStream fileWriter;
+
+    private long bytesWritten = -1;

Review Comment:
   ```suggestion
       private long fileBytesWritten = -1;
   ```



##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -1662,12 +1682,167 @@ public <C extends Set<?>> C 
readSet(MessageCollectionItemType itemType, MessageR
         return map0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeFile(File file) {
+        try {
+            if (file == null) {
+                writeInt(-1);
+                return;
+            }
+
+            switch (fileSate) {
+                case 0:
+                    writeLong(file.length());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 1:
+                    writeString(file.getName());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 2:
+                    lastFinished = false;
+
+                    if (fileReader == null) {
+                        fileReader = new BufferedInputStream(new 
FileInputStream(file));
+                    }
+
+                    int pos = buf.position();
+                    int toWrite = Math.min(buf.remaining(), 
fileReader.available());
+
+                    if (toWrite > 0) {
+                        byte[] arr = new byte[toWrite];
+                        fileReader.read(arr, 0, toWrite);
+                        GridUnsafe.copyMemory(arr, BYTE_ARR_OFF, heapArr, 
baseOff + pos, toWrite);
+                        buf.position(pos + toWrite);
+                    }
+
+                    if (fileReader.available() == 0) {
+                        fileReader.close();
+                        fileReader = null;
+
+                        lastFinished = true;
+
+                        fileSate = 0;
+                    }
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid file state: " + 
fileSate);
+            }
+        } catch (IOException e) {
+            if (fileReader != null) {
+                try {
+                    fileReader.close();
+                    fileReader = null;
+                } catch (IOException ex) {
+                    RuntimeException exception = new RuntimeException(ex);
+                    exception.addSuppressed(e);
+                    throw exception;
+                }
+            }
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public File readFile() {
+        try {
+            switch (fileSate) {
+                case 0:
+                    fileSize = readLong();
+                    if (!lastFinished) {
+                        return null;
+                    } else if (fileSize < 0) {
+                        return null;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 1:
+                    String fileName = readString();
+                    if (!lastFinished) {
+                        return null;
+                    }
+
+                    filePath = Files.createTempDirectory("").resolve(fileName);
+                    fileWriter = new BufferedOutputStream(new 
FileOutputStream(filePath.toFile(), true));
+
+                    fileSate++;
+                    bytesWritten = 0;
+
+                    //noinspection fallthrough
+                case 2:
+                    lastFinished = false;
+
+                    int pos = buf.position();
+                    int remaining = buf.remaining();
+                    int toRead = (int) Math.min(remaining, fileSize - 
bytesWritten);
+
+                    byte[] arr = new byte[toRead];

Review Comment:
   Same here, we should write to writer from bytebuffer without allocations



##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -1662,12 +1682,167 @@ public <C extends Set<?>> C 
readSet(MessageCollectionItemType itemType, MessageR
         return map0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeFile(File file) {
+        try {
+            if (file == null) {
+                writeInt(-1);
+                return;
+            }
+
+            switch (fileSate) {
+                case 0:
+                    writeLong(file.length());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 1:
+                    writeString(file.getName());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 2:
+                    lastFinished = false;
+
+                    if (fileReader == null) {
+                        fileReader = new BufferedInputStream(new 
FileInputStream(file));
+                    }
+
+                    int pos = buf.position();
+                    int toWrite = Math.min(buf.remaining(), 
fileReader.available());
+
+                    if (toWrite > 0) {
+                        byte[] arr = new byte[toWrite];
+                        fileReader.read(arr, 0, toWrite);
+                        GridUnsafe.copyMemory(arr, BYTE_ARR_OFF, heapArr, 
baseOff + pos, toWrite);
+                        buf.position(pos + toWrite);
+                    }
+
+                    if (fileReader.available() == 0) {
+                        fileReader.close();
+                        fileReader = null;
+
+                        lastFinished = true;
+
+                        fileSate = 0;
+                    }
+                    break;
+
+                default:
+                    throw new IllegalStateException("Invalid file state: " + 
fileSate);
+            }
+        } catch (IOException e) {
+            if (fileReader != null) {
+                try {
+                    fileReader.close();
+                    fileReader = null;
+                } catch (IOException ex) {
+                    RuntimeException exception = new RuntimeException(ex);
+                    exception.addSuppressed(e);
+                    throw exception;
+                }
+            }
+            throw new RuntimeException(e);
+        }
+
+    }
+
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public File readFile() {
+        try {
+            switch (fileSate) {
+                case 0:
+                    fileSize = readLong();
+                    if (!lastFinished) {
+                        return null;
+                    } else if (fileSize < 0) {
+                        return null;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 1:
+                    String fileName = readString();
+                    if (!lastFinished) {
+                        return null;
+                    }
+
+                    filePath = Files.createTempDirectory("").resolve(fileName);
+                    fileWriter = new BufferedOutputStream(new 
FileOutputStream(filePath.toFile(), true));

Review Comment:
   I presume, this doesn't retain file attributes? I'm not sure if they are 
needed though



##########
modules/network/src/test/resources/files/string.json:
##########
@@ -0,0 +1,5 @@
+{

Review Comment:
   Not sure if we need these files, it's easier and cleaner to generate them



##########
modules/network/src/main/java/org/apache/ignite/internal/network/direct/stream/DirectByteBufferStreamImplV1.java:
##########
@@ -1662,12 +1682,167 @@ public <C extends Set<?>> C 
readSet(MessageCollectionItemType itemType, MessageR
         return map0;
     }
 
+    /**
+     * {@inheritDoc}
+     */
+    @Override
+    public void writeFile(File file) {
+        try {
+            if (file == null) {
+                writeInt(-1);
+                return;
+            }
+
+            switch (fileSate) {
+                case 0:
+                    writeLong(file.length());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 1:
+                    writeString(file.getName());
+
+                    if (!lastFinished) {
+                        return;
+                    }
+
+                    fileSate++;
+
+                    //noinspection fallthrough
+                case 2:
+                    lastFinished = false;
+
+                    if (fileReader == null) {
+                        fileReader = new BufferedInputStream(new 
FileInputStream(file));
+                    }
+
+                    int pos = buf.position();
+                    int toWrite = Math.min(buf.remaining(), 
fileReader.available());
+
+                    if (toWrite > 0) {
+                        byte[] arr = new byte[toWrite];

Review Comment:
   I think it is possible to read file directly to the output buffer to avoid 
allocations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to