Hi,

On Tue, Feb 20, 2018 at 4:32 PM, Chuck Davis <cjgun...@gmail.com> wrote:
> Simone, please, please tell me how this is done.

The WebSocket.Listener APIs return a CompletableFuture, indicating
when the application is done with the processing of the parameters
passed to the listener method (in particular the ByteBuffer).
We can leverage this to avoid to copy the bytes in the ByteBuffer - we
will just store the ByteBuffers in the ByteBufferInputStream for later
use, and we return a CompletableFuture that we will complete when we
have consumed the ByteBuffer bytes.

When the listener tells you that you have the LAST (or WHOLE) part,
then you can wrap the ByteBufferInputStream with your
ObjectInputStream.

The code is attached and shows the bare minimum implementation of
ByteBufferInputStream - just implementing `int read()`.
Implementing `int read(byte[], int, int)` is left as exercise :)

Just to be paranoid, the attached code license is public domain as
explained in https://creativecommons.org/publicdomain/zero/1.0/.

Just to be clear, the only copy that happens is when you have to
bridge from the byte[] format used by InputStream to the ByteBuffer
format.
The same copy happens when you use ByteArrayInputStream (that you used
in the "good" code in your first email of this thread), so this
version is on par with your "good" code.

This version has the advantage that guarantees that no other copy is
done, not even by the implementation (hopefully, right Pavel ?), while
with JDK 8 there may have been hidden copies to provide a full
ByteBuffer.

-- 
Simone Bordet
---
Finally, no matter how good the architecture and design are,
to deliver bug-free software with optimal performance and reliability,
the implementation technique must be flawless.   Victoria Livschitz
/**
 * Licensed under public domain
 * https://creativecommons.org/publicdomain/zero/1.0/
 */

import java.io.InputStream;
import java.io.ObjectInputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import jdk.incubator.http.WebSocket;

public class WebSocketTest
{
    private static class ByteBufferInputStream extends InputStream
    {
        private final List<ByteBuffer> buffers = new ArrayList<>();
        private final List<CompletableFuture<Void>> callbacks = new ArrayList<>();
        private int index;

        public CompletableFuture<Void> offer(ByteBuffer byteBuffer)
        {
            buffers.add(byteBuffer);
            CompletableFuture<Void> result = new CompletableFuture<>();
            callbacks.add(result);
            return result;
        }

        @Override
        public int read()
        {
            while (index < buffers.size())
            {
                ByteBuffer byteBuffer = buffers.get(index);
                if (byteBuffer.hasRemaining())
                    return byteBuffer.get() & 0xFF;
                callbacks.get(index).complete(null);
                ++index;
            }
            return -1;
        }
    }

    private static class MyListener implements WebSocket.Listener
    {
        private final ByteBufferInputStream bbis = new ByteBufferInputStream();

        @Override
        public CompletionStage<?> onBinary(WebSocket webSocket, ByteBuffer message, WebSocket.MessagePart part)
        {
            CompletableFuture<Void> result = bbis.offer(message);
            if (part == WebSocket.MessagePart.LAST || part == WebSocket.MessagePart.WHOLE)
            {
                ObjectInputStream ois = new ObjectInputStream(bbis);
                System.err.println("ois.readObject() = " + ois.readObject());
            }
            return result;
        }
    }
}

Reply via email to