The engine API consists of the set of interfaces in the
org.apache.qpid.proton.engine package. You can construct these from the
org.apache.qpid.proton.Proton class.

FWIW, the engine API is much lower level than messenger, so there will be a
lot more coding involved just to send a message. Based on your description
I can think of a couple of different ways you could still use messenger if
you were so inclined. For example if you want one thread processing
messages asynchronously and  you want to be able to send messages from any
thread, that should be pretty straightforward to set up, e.g.:

    // messenger thread
    // for outgoing messages:
    ConcurrentLinkedQueue<Message> outgoing = ...;

    while (true) {
        while (true) {
            Message m = outgoing.poll();
            if (m == null) { break; }
            messenger.put(m);
        }
        try {
            messenger.recv(); // block until incoming messages are
available (or we are interrupted)
            Message in = messenger.get();
            // dispatch incoming message
        } catch (InterruptException e) {
            // we were interrupted, just loop and check for outgoing
messages
        }
    }

    // threads post messages by putting them on the outgoing queue and
interrupting
    outgoing.put(m);
    messenger.interrupt();

Obviously you'd want to wrap all that up in some convenient facade, but the
basic strategy should work. A variant of this would be to use a separate
messenger for sending and receiving, and of course you have the option to
use a separate messenger per thread also.

Note that in all of these cases you may eventually want to think about how
to apply back-pressure, e.g. if your application produces messages at a
greater rate than the network can handle, then you may want to use an
ArrayBlockingQueue with a size bound so that your outgoing queue won't grow
without bound.

--Rafael



On Wed, Feb 26, 2014 at 11:11 AM, Piotr Kliczewski <
[email protected]> wrote:

> Rafael,
>
> Thank you for detailed explanation. I used Messanger API but it is
> clear for me now that I need to go with engine API. I looked at Java
> API reference for the engine API and noticed that there are all
> proton-j javadocs (including Messenger). Can you tell me which
> packages/classes are supported as part of the engine API. I want to
> use only published API not the internal code which may frequently
> change.
>
> I haven't encountered any stack traces in single threaded usage. I
> want to process messages asynchronously in single thread from
> different channels but many threads will send messages so my use case
> is more complex than single threaded processing.
>
> Piotr
>
> On Wed, Feb 26, 2014 at 4:43 PM, Rafael Schloming <[email protected]>
> wrote:
> > Hi Piotr,
> >
> > I'm afraid I may have led you a bit astray in my initial post. For some
> > reason I thought you were using the engine API rather than Messenger,
> even
> > though in retrospect it's obvious from the stack trace you posted that
> you
> > were in fact using Messenger. My comments on not using the Driver were
> > predicated on the misconception that you were using the engine API
> > directly. Messenger uses the driver internally and it is not currently
> set
> > up to allow for externally driven I/O. The C implementation of messenger
> > will allow for such externally driven I/O in the next release, however I
> > don't know if I'll have time to make the corresponding changes in the
> Java
> > implementation by then.
> >
> > That said, so long as you only allow one thread at a time to access a
> given
> > Messenger, you shouldn't have any issues using it with the supplied
> driver.
> > Did you encounter the original stack trace you posted in such single
> > threaded usage? If so it is definitely a bug and I'd appreciate any help
> > you could offer in reproducing it. If not then I'd recommend keeping your
> > usage of Messenger to a single thread. You can always use concurrent
> queues
> > to pass messages off to other threads for processing, and it's highly
> > unlikely the I/O load that messenger imposes would ever need to be
> > distributed across multiple threads.
> >
> > --Rafael
> >
> >
> >
> > On Wed, Feb 26, 2014 at 8:21 AM, Piotr Kliczewski <
> > [email protected]> wrote:
> >
> >> Rafael,
> >>
> >> I implemented my own Driver as you suggested and noticed issue just
> after
> >> running createConnector method.
> >>
> >> I want to send a single message and I have single thread to receive
> >> potential messages.
> >> During sending the message in put(message) method getLink is run which
> >> delegates
> >> connector creation to a driver implementation. There are few steps
> >> done and new connection is set
> >> to the connector. In the mean time MessageAvailable looks for any
> >> incoming messages
> >> by iterating through connectors taken from a driver. Unlucky part is
> >> that before connection
> >> is set on connector in getLink MessageAvailable tries to asses whether
> >> there are messages
> >> so I end up with:
> >>
> >> Exception in thread "Proton Reactor" java.lang.NullPointerException
> >> at
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl$MessageAvailable.test(MessengerImpl.java:1097)
> >> at
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:872)
> >> at
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:853)
> >> at
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.recv(MessengerImpl.java:451)
> >> at
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.recv(MessengerImpl.java:456)
> >> at
> >>
> org.ovirt.vdsm.jsonrpc.client.reactors.proton.ProtonReactor.run(ProtonReactor.java:96)
> >>
> >> Do you have any suggestions how to over come this issue?
> >>
> >> Piotr
> >>
> >>
> >> On Tue, Feb 25, 2014 at 7:27 PM, Rafael Schloming <[email protected]>
> >> wrote:
> >> > To be honest I wouldn't necessarily bother using the Driver for
> proton-j.
> >> > It is there to allow for testing, but it should be easy enough to
> either
> >> do
> >> > your own I/O or use one of a number of off the shelf I/O libraries
> >> (getty,
> >> > MINA, etc). All the driver does is provide a very simplistic way to
> pump
> >> > bytes between a network socket and the Transport interface provided by
> >> the
> >> > engine API. It is really only a few lines of code to do this pumping
> >> > yourself though if you have a suitable I/O library available, and if
> you
> >> > were to do this you'd get a lot more flexibility in terms of how your
> >> > threading works. The only real constraint is that the Connection and
> >> > Transport object should only be accessed by one thread at a time.
> >> >
> >> > --Rafael
> >> >
> >> >
> >> > On Tue, Feb 25, 2014 at 10:49 AM, Piotr Kliczewski <
> >> > [email protected]> wrote:
> >> >
> >> >> Hello,
> >> >>
> >> >> In the java doc for Driver interface I see:
> >> >>
> >> >> "Unless otherwise stated, methods on Driver implementations are not
> >> >> necessarily thread-safe."
> >> >>
> >> >> and during testing I got:
> >> >>
> >> >> Exception in thread "Proton Reactor"
> >> >> java.util.ConcurrentModificationException
> >> >>         at
> >> >>
> java.util.LinkedList$ListItr.checkForComodification(LinkedList.java:953)
> >> >>         at java.util.LinkedList$ListItr.next(LinkedList.java:886)
> >> >>         at
> >> >>
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl$MessageAvailable.test(MessengerImpl.java:1094)
> >> >>         at
> >> >>
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:872)
> >> >>         at
> >> >>
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.waitUntil(MessengerImpl.java:853)
> >> >>         at
> >> >>
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.recv(MessengerImpl.java:451)
> >> >>         at
> >> >>
> >>
> org.apache.qpid.proton.messenger.impl.MessengerImpl.recv(MessengerImpl.java:456)
> >> >>
> >> >> What is the best practice for threading when using proton-j?
> >> >>
> >> >> Piotr
> >> >>
> >> >> ---------------------------------------------------------------------
> >> >> To unsubscribe, e-mail: [email protected]
> >> >> For additional commands, e-mail: [email protected]
> >> >>
> >> >>
> >>
> >> ---------------------------------------------------------------------
> >> To unsubscribe, e-mail: [email protected]
> >> For additional commands, e-mail: [email protected]
> >>
> >>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
>
>

Reply via email to