Hi, On Tue, Feb 20, 2018 at 4:32 PM, Chuck Davis <cjgun...@gmail.com> wrote: > Simone, please, please tell me how this is done.
The WebSocket.Listener APIs return a CompletableFuture, indicating when the application is done with the processing of the parameters passed to the listener method (in particular the ByteBuffer). We can leverage this to avoid to copy the bytes in the ByteBuffer - we will just store the ByteBuffers in the ByteBufferInputStream for later use, and we return a CompletableFuture that we will complete when we have consumed the ByteBuffer bytes. When the listener tells you that you have the LAST (or WHOLE) part, then you can wrap the ByteBufferInputStream with your ObjectInputStream. The code is attached and shows the bare minimum implementation of ByteBufferInputStream - just implementing `int read()`. Implementing `int read(byte[], int, int)` is left as exercise :) Just to be paranoid, the attached code license is public domain as explained in https://creativecommons.org/publicdomain/zero/1.0/. Just to be clear, the only copy that happens is when you have to bridge from the byte[] format used by InputStream to the ByteBuffer format. The same copy happens when you use ByteArrayInputStream (that you used in the "good" code in your first email of this thread), so this version is on par with your "good" code. This version has the advantage that guarantees that no other copy is done, not even by the implementation (hopefully, right Pavel ?), while with JDK 8 there may have been hidden copies to provide a full ByteBuffer. -- Simone Bordet --- Finally, no matter how good the architecture and design are, to deliver bug-free software with optimal performance and reliability, the implementation technique must be flawless. Victoria Livschitz
/** * Licensed under public domain * https://creativecommons.org/publicdomain/zero/1.0/ */ import java.io.InputStream; import java.io.ObjectInputStream; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; import jdk.incubator.http.WebSocket; public class WebSocketTest { private static class ByteBufferInputStream extends InputStream { private final List<ByteBuffer> buffers = new ArrayList<>(); private final List<CompletableFuture<Void>> callbacks = new ArrayList<>(); private int index; public CompletableFuture<Void> offer(ByteBuffer byteBuffer) { buffers.add(byteBuffer); CompletableFuture<Void> result = new CompletableFuture<>(); callbacks.add(result); return result; } @Override public int read() { while (index < buffers.size()) { ByteBuffer byteBuffer = buffers.get(index); if (byteBuffer.hasRemaining()) return byteBuffer.get() & 0xFF; callbacks.get(index).complete(null); ++index; } return -1; } } private static class MyListener implements WebSocket.Listener { private final ByteBufferInputStream bbis = new ByteBufferInputStream(); @Override public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer message, WebSocket.MessagePart part) { CompletableFuture<Void> result = bbis.offer(message); if (part == WebSocket.MessagePart.LAST || part == WebSocket.MessagePart.WHOLE) { ObjectInputStream ois = new ObjectInputStream(bbis); System.err.println("ois.readObject() = " + ois.readObject()); } return result; } } }