RE: CorrelationId
xavier wrote Perhaps I must free some object, but in debug mode and checking the memory, memory grow up, each time I do pn_messenger_subscribe, pn_messenger_recv, etc. did you try running your binary via valgrind memcheck? -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617738.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
RE: CorrelationId
Hi, Any body have an idea about this? The new version of the lib will fix this, or it's my code? regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617531.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
-bb82-e41e-1236-5c9f77a057c7'}], t arget=@target(41) [address=queue://emc2-ea2a65, durable=0, timeout=0, dynamic=false], initial-delivery-count=0] [006FE360]:0 - @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals e] [006FE360]: - AMQP [006FE360]:0 - @open(16) [container-id=, hostname=, max-frame-size=4294967295, channel-max=32767, properties={:x-opt-anonymous-relay=$relay}] [006FE360]:0 - @begin(17) [remote-channel=0, next-outgoing-id=1, incoming-window=0, outgoing-window=0, handle-max=1024] [006FE360]:0 - @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=1, outgoing-window=0] [006FE360]:0 - @attach(18) [name=queue://emc2-ea2a65, handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address=queu e://emc2-ea2a65, durable=0, expiry-policy=:session-end, timeout=0, dynamic=false, filter={:jms-selector=@string JMSCorrelationID='28a8d7e8-bb8 2-e41e-1236-5c9f77a057c7'}], target=@target(41) [address=queue://emc2-ea2a65], incomplete-unsettled=false, initial-delivery-count=0] [006FE360]:0 - @transfer(20) [handle=0, delivery-id=0, delivery-tag=b, message-format=0, settled=true] (407) \x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1 7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a 8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{header:{manufacturer:com.eaton.pqsoft,uuid:c1b3e315-a5e4-47b 4-8128-51b0bf6284b7,timeStamp:1418208069,providerId:emc4j,flowType:reply,destination:TestSimpleBus.add,contentType:java.lang.Integer },body:18} [02B82D30]:0 - @transfer(20) [handle=0, delivery-id=1, delivery-tag=b\x01\x00\x00\x00\x00\x00\x00\x00, message-format=0, settled=true, more=false] (405) \x00Sp\xd0\x00\x00\x00\x0b\x00\x00\x00\x05BP\x04@BR\x00\x00Ss\xd0\x00\x00\x00\x86\x00\x00\x00\x0d@@\xa1+amqp://localhost:5672/queue://TestSimpl eBus@\xa1\x0bemc2-ea2a65\xa1$3d28790d-1a5d-49d5-36d9-e0bd143a97fe\xa3\x06string@\x83\x00\x00\x00\x00\x00\x00\x00\x00\x83\x00\x00\x00\x00\x00\x00\x00\x 00@R\x00@\x00Sw\xa1\xef{header:{manufacturer:com.eaton.pqsoft,uuid:c3046674-4e25-775f-d043-ecf657cf33ca,timeStamp:1418208081.919,provider Id:emc2-ea2a65,flowType:request,destination:TestSimpleBus.add,bodyType:null},body:{a:10,b:8}} 2014/12/10-11:42:48.581 | AmqpMessageBusTransport | ERROR : Timeout reached on the get synchronous reply, queue: emc2-ea2a65 -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617326.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi Xavier, so 006FE360 is your messenger that is responsible for subscribing and receiving the messages based upon the selector. xavier wrote [006FE360]: - SASL [006FE360]:0 - @sasl-init(65) [mechanism=:PLAIN, initial-response=b\x00emc2\x00emc2] [006FE360]: - SASL [006FE360]:0 - @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]] [006FE360]:0 - @sasl-outcome(68) [code=0] [006FE360]: - AMQP [006FE360]:0 - @open(16) [container-id=6aae955e-cd32-488a-8119-67ec0b715924, hostname=localhost] [006FE360]:0 - @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0] [006FE360]:0 - @attach(18) [name=queue://emc2-ea2a65, handle=0, role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40) [address=queue ://emc2-ea2a65, durable=0, timeout=0, dynamic=false, filter={:jms-selector=@string JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'}], t arget=@target(41) [address=queue://emc2-ea2a65, durable=0, timeout=0, dynamic=false], initial-delivery-count=0] [006FE360]:0 - @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals e] [006FE360]: - AMQP [006FE360]:0 - @open(16) [container-id=, hostname=, max-frame-size=4294967295, channel-max=32767, properties={:x-opt-anonymous-relay=$relay}] [006FE360]:0 - @begin(17) [remote-channel=0, next-outgoing-id=1, incoming-window=0, outgoing-window=0, handle-max=1024] [006FE360]:0 - @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=1, outgoing-window=0] [006FE360]:0 - @attach(18) [name=queue://emc2-ea2a65, handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address=queu e://emc2-ea2a65, durable=0, expiry-policy=:session-end, timeout=0, dynamic=false, filter={:jms-selector=@string JMSCorrelationID='28a8d7e8-bb8 2-e41e-1236-5c9f77a057c7'}], target=@target(41) [address=queue://emc2-ea2a65], incomplete-unsettled=false, initial-delivery-count=0] [006FE360]:0 - @transfer(20) [handle=0, delivery-id=0, delivery-tag=b, message-format=0, settled=true] (407) \x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1 7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a 8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{header:{manufacturer:com.eaton.pqsoft,uuid:c1b3e315-a5e4-47b 4-8128-51b0bf6284b7,timeStamp:1418208069,providerId:emc4j,flowType:reply,destination:TestSimpleBus.add,contentType:java.lang.Integer },body:18} We can see it connect, make the @attach, receive an ACK to that @attach, and then receive its first message via the @transfer. However, after that we no longer see that messenger doing anything? Based upon the earlier discussions, I'd have expected to see an @close for the pn_link_{close,detach} and then a fresh @attach for a link with the new correlation id selector. -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617327.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
RE: CorrelationId
Here the right code //Doing this one time pn_messenger_t* msgConsumer= pn_messenger(NULL); pn_messenger_set_timeout (msgConsumer, 1000); pn_messenger_set_blocking (msgConsumer, true); pn_messenger_set_incoming_window (msgConsumer, 1); // Doing this more time pn_link_t* link; pn_messenger_subscribe(msgConsumer, amqp://127.0.0.1:5672/queue://myqueue); link = pn_messenger_get_link(msgConsumer, (amqp://127.0.0.1:5672/queue://myqueue).c_str(), false); pn_link_open(link); pn_terminus_t* terminus = pn_link_source(link); pn_data_t* data = pn_terminus_filter (terminus); /* Map creation with selector*/ std::string selector = jms-selector; pn_data_put_map(data); pn_data_enter(data);; pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str())); // Described of the JMS_SELECTOR line 1262 std::string filter = JMSCorrelationID='12346789'; pn_data_put_described(data); pn_data_enter(data); pn_data_put_string(data, pn_bytes(6, string)); pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str())); pn_data_exit(data); pn_messenger_recv(msgConsumer, -1); if (pn_messenger_incoming(msgConsumer)) { // The message is arrived pn_message_t* message = pn_message(); pn_messenger_get(msgConsumer, message); . . pn_message_free(message); } pn_link_close(link); -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617333.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
RE: CorrelationId
Dominic, I posted my code for the community. But I progress, and all works, but I saw, with this code, the memory grow up in each time, I did this function: pn_link_t* link; pn_messenger_subscribe(msgConsumer, amqp://127.0.0.1:5672/queue://myqueue); link = pn_messenger_get_link(msgConsumer, (amqp://127.0.0.1:5672/queue://myqueue).c_str(), false); pn_link_open(link); pn_terminus_t* terminus = pn_link_source(link); pn_data_t* data = pn_terminus_filter (terminus); /* Map creation with selector*/ std::string selector = jms-selector; pn_data_put_map(data); pn_data_enter(data);; pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str())); // Described of the JMS_SELECTOR line 1262 std::string filter = JMSCorrelationID='12346789'; pn_data_put_described(data); pn_data_enter(data); pn_data_put_string(data, pn_bytes(6, string)); pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str())); pn_data_exit(data); pn_messenger_recv(msgConsumer, -1); if (pn_messenger_incoming(msgConsumer)) { // The message is arrived pn_message_t* message = pn_message(); pn_messenger_get(msgConsumer, message); . . pn_message_free(message); } pn_link_close(link); The goal is simple, why I implemented this, because I implemented the pattern request/reply, and the reply is in function of the correlationid (setted in the request), so in my application I have the msgConsumer builds one time, and I reuse it each time, that I wait the reply. Perhaps I must free some object, but in debug mode and checking the memory, memory grow up, each time I do pn_messenger_subscribe, pn_messenger_recv, etc. Cheers From: Dominic Evans [via Qpid] [mailto:ml-node+s2158936n7617327...@n2.nabble.com] Sent: mercredi 10 décembre 2014 12:07 To: Millieret, Xavier Subject: Re: CorrelationId Hi Xavier, so 006FE360 is your messenger that is responsible for subscribing and receiving the messages based upon the selector. xavier wrote [006FE360]: - SASL [006FE360]:0 - @sasl-init(65) [mechanism=:PLAIN, initial-response=b\x00emc2\x00emc2] [006FE360]: - SASL [006FE360]:0 - @sasl-mechanisms(64) [sasl-server-mechanisms=@PN_SYMBOL[:ANONYMOUS, :PLAIN]] [006FE360]:0 - @sasl-outcome(68) [code=0] [006FE360]: - AMQP [006FE360]:0 - @open(16) [container-id=6aae955e-cd32-488a-8119-67ec0b715924, hostname=localhost] [006FE360]:0 - @begin(17) [next-outgoing-id=0, incoming-window=2147483647, outgoing-window=0] [006FE360]:0 - @attach(18) [name=queue://emc2-ea2a65, handle=0, role=true, snd-settle-mode=1, rcv-settle-mode=0, source=@source(40) [address=queue ://emc2-ea2a65, durable=0, timeout=0, dynamic=false, filter={:jms-selector=@string JMSCorrelationID='28a8d7e8-bb82-e41e-1236-5c9f77a057c7'}], t arget=@target(41) [address=queue://emc2-ea2a65, durable=0, timeout=0, dynamic=false], initial-delivery-count=0] [006FE360]:0 - @flow(19) [incoming-window=2147483647, next-outgoing-id=0, outgoing-window=0, handle=0, delivery-count=0, link-credit=1024, drain=fals e] [006FE360]: - AMQP [006FE360]:0 - @open(16) [container-id=, hostname=, max-frame-size=4294967295, channel-max=32767, properties={:x-opt-anonymous-relay=$relay}] [006FE360]:0 - @begin(17) [remote-channel=0, next-outgoing-id=1, incoming-window=0, outgoing-window=0, handle-max=1024] [006FE360]:0 - @flow(19) [next-incoming-id=0, incoming-window=2147483647, next-outgoing-id=1, outgoing-window=0] [006FE360]:0 - @attach(18) [name=queue://emc2-ea2a65, handle=0, role=false, snd-settle-mode=2, rcv-settle-mode=0, source=@source(40) [address=queu e://emc2-ea2a65, durable=0, expiry-policy=:session-end, timeout=0, dynamic=false, filter={:jms-selector=@string JMSCorrelationID='28a8d7e8-bb8 2-e41e-1236-5c9f77a057c7'}], target=@target(41) [address=queue://emc2-ea2a65], incomplete-unsettled=false, initial-delivery-count=0] [006FE360]:0 - @transfer(20) [handle=0, delivery-id=0, delivery-tag=b, message-format=0, settled=true] (407) \x00Sp\xc0\x04\x02BP\x04\x00Sr\xc1\x1 7\x02\xa3\x0dx-opt-to-type\xa1\x05queue\x00Ss\xc0\x7f\x0a\xa12ID:GREFRWHP1006921-61781-1418144200447-1:1:383:1:1@\xa1\x13queue://emc2-ea2a65@@\xa1$28a 8d7e8-bb82-e41e-1236-5c9f77a057c7@@@\x83\x00\x00\x01J3\xc9\xc9\x12\x00Sw\xa1\xe9{header:{manufacturer:com.eaton.pqsoft,uuid:c1b3e315-a5e4-47b 4-8128-51b0bf6284b7,timeStamp:1418208069,providerId:emc4j,flowType:reply,destination:TestSimpleBus.add,contentType:java.lang.Integer },body:18} We can see it connect, make the @attach, receive an ACK to that @attach, and then receive its first message via the @transfer. However, after that we no longer see that messenger doing anything? Based upon the earlier discussions, I'd have expected to see an @close for the pn_link_{close,detach} and then a fresh @attach for a link with the new correlation id selector. If you reply to this email, your message will be added to the discussion below: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617327.html To unsubscribe
Re: CorrelationId
Hi Xavier, xavier wrote I posted my solution, it works, but I did some performance test, and it's not very good, I found why. In my solution, I start, and stop for any receiver the messenger, because if I don't do this, with my code (posted) I receive one time, the message with a filter (attached to the link) but for the second message, (I change the filter before) I don't receive the message. I have not the competency on the engine, so I am lost how do this without start and stop messenger on each reception So the behaviour you are seeing is because for the AMQP 1.0 protocol your selector filter is set on the Link at attachment time, and I don't believe there is any action that allows you to modify a filter once the link has been established. Theoretically, rather than doing a full pn_messenger_stop + pn_messenger_start, you should be able to just do a pn_link_detach(link); /* modify the filter */ ; pn_link_open(link); and that might give you *slightly* better performance. But I think if you are going to be doing this on a per-message basis then you would be better of receiving all messages on the Link and just accepting those that you are interested in via correl id -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617293.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi Dominic unfortunately, it 's does not work!! I try your idea, see my code (I move the pn_link_detach() and pn_link_open() in all my code, but .) Here my code: // Init one time to use many time pn_messenger_t* msgConsumer= pn_messenger(NULL); pn_messenger_set_timeout (msgConsumer, 1000); pn_messenger_set_blocking (msgConsumer, true); pn_messenger_set_incoming_window (msgConsumer, 1); // After in a method pn_messenger_subscribe(msgConsumer, amqp://127.0.0.1:5672/queue://myqueue); pn_link_t* link = pn_messenger_get_link(msgConsumer, (amqp://127.0.0.1:5672/queue://myqueue).c_str(), false); *pn_link_detach(link);* pn_terminus_t* terminus = pn_link_source(link); pn_data_t* data = pn_terminus_filter (terminus); /* Map creation with selector*/ std::string selector = jms-selector; pn_data_put_map(data); pn_data_enter(data); pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str())); // Described std::string filter = JMSCorrelationID='12346789'; pn_data_put_described(data); pn_data_enter(data); pn_data_put_string(data, pn_bytes(6, string)); pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str())); pn_data_exit(data); *pn_link_open(link);* pn_messenger_recv(msgConsumer, -1); if (pn_messenger_incoming(msgConsumer)) { // The message is arrived pn_message_t* message = pn_message(); pn_messenger_get(msgConsumer, message); . . pn_message_free(message); } And unfortunately, I have a time out, the message is on the queue, and like before (if I do pn_messenger_start before pn_messenger_recv and pn_messenger_stop after, it's works!!! Your help is very important, I believe, I am not very far, but. So what do you thinks Dominic??? Thanks a lot -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617311.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi all, I found the solution (for ActiveMq, with a connector in AMQP, and a connector with openwire), so I share the solution. I used the idea of Dominic Evans, so If I want to get a message from a filter (for example correlationId ), I did this: pn_messenger_t* msgConsumer= pn_messenger(NULL); pn_messenger_set_timeout (msgConsumer, 1000); pn_messenger_set_blocking (msgConsumer, true); pn_messenger_set_incoming_window (msgConsumer, 1); pn_messenger_subscribe(msgConsumer, amqp://127.0.0.1:5672/queue://myqueue); pn_link_t* link = pn_messenger_get_link(msgConsumer, (amqp://127.0.0.1:5672/queue://myqueue).c_str(), false); pn_terminus_t* terminus = pn_link_source(link); pn_data_t* data = pn_terminus_filter (terminus); /* Map creation with selector*/ std::string selector = jms-selector; pn_data_put_map(data); pn_data_enter(data);; pn_data_put_symbol(data, pn_bytes(selector.size(), selector.c_str())); // Described of the JMS_SELECTOR line 1262 std::string filter = JMSCorrelationID='12346789'; pn_data_put_described(data); pn_data_enter(data); pn_data_put_string(data, pn_bytes(6, string)); pn_data_put_string(data, pn_bytes(filter.size(), filter.c_str())); pn_data_exit(data); pn_messenger_start (msgConsumer); pn_messenger_recv(msgConsumer, -1); if (pn_messenger_incoming(msgConsumer)) { // The message is arrived pn_message_t* message = pn_message(); pn_messenger_get(msgConsumer, message); . . pn_message_free(message); } ... It's works, so thanks to Dominic Evans to give me the begining way. enjoy -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7617208.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi all, I re start to work on this issue!!! and it is not work! So here my last code: // get recv link pn_link_t* link = pn_messenger_get_link (messengerConsumer, myAdress, false); // get the source link, is't it, I saw, before doing that on the the activemq's log, than the filter is: the source=Source{address='queue://myQueue', durable=NONE, filter=null, pn_terminus_t* terminus = pn_link_source(link); // Get the terminus pn_data_t* data = pn_terminus_filter (terminus); // I would like to put a map with jms-selector key (or selector-filter) like the explanation: // https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/apache-filters.xml#type-selector-filter // but I don't know to do that I have just this pn_data_put_map(pn_data_t *data), and how to inset somethings so I try with symbol pn_data_put_symbol(data, pn_bytes(strlen(color='blue'), color='blue')); pn_link_close(link); and on my activeMq I have the following error: AmqpProtocolException: Could not decode AMQP frame The goal is to put mycorrelation id on this filter, of course! Any help will be appreciated, because, in this moment i am lost with this library!! Thanks -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7616320.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi all, pn_messenger_get_link is not in the 0.7 qpid lib, right? Is it in the next release 0.8 ? If yes do you have any date for it, and code sample, with? Thanks a lot regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614826.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
fadams wrote On 08/10/14 18:49, Rafael Schloming wrote: Actually I think it is possible to do this with messenger now that dominic's patches are in. You can access the link associated with any given address using pn_messenger_get_link() and configure it however you want to. --Rafael Oooh now there's a thought Rafi, I was wondering if Dominic's patches might open up that possibility but I didn't really know enough about the link API to be sure. Don't suppose that you have any code snippets that could help out? Presumably the main call is the recently added pn_messenger_get_link(pn_messenger_t *messenger, const char *address, bool sender) I'm not clear at which point you'd need to fiddle with it, so for example I'd normally do a subscribe, so would I first do a subscribe and then immediately do a pn_messenger_get_link using the same address as the source used for the subscription? I'm afraid I'm a bit ignorant in all this 'cause I'd have assumed that you'd need to specify things like filters/selectors before the link attach occurs, but I wasn't clear that you had control of that in Messenger, as I say all I've done has been fairly basic things like subscribe followed by a check on incoming followed by a get. Some code (or even pseudocode) would be really useful. Frase So for me, pn_messenger_get_link was only really intended for interrogating the status of a link (e.g., checking if its ACTIVE or CLOSED) or (post-attach) for modifying the time-to-live before closing it. So e.g., pn_link_t* link = pn_messenger_get_link(messenger, address, false); // get recv link pn_terminus_set_timeout(pn_link_target(link), ttl); pn_link_close(link); Afaik the interpretation of AMQP filters is entirely broker specific? Either way I expect that the broker will expect it be sent as part of the initial @attach. You *could* try modifying the pn_data_t returned by pn_terminus_filter and then doing a pn_link_close immediately followed by a pn_link_open. Alternatively extend pn_messenger_subscribe to allow you to specify a filter up-front before the first attach. -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614830.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
On Thu, Oct 9, 2014 at 8:44 AM, Dominic Evans dominic.ev...@uk.ibm.com wrote: fadams wrote On 08/10/14 18:49, Rafael Schloming wrote: Actually I think it is possible to do this with messenger now that dominic's patches are in. You can access the link associated with any given address using pn_messenger_get_link() and configure it however you want to. --Rafael Oooh now there's a thought Rafi, I was wondering if Dominic's patches might open up that possibility but I didn't really know enough about the link API to be sure. Don't suppose that you have any code snippets that could help out? Presumably the main call is the recently added pn_messenger_get_link(pn_messenger_t *messenger, const char *address, bool sender) I'm not clear at which point you'd need to fiddle with it, so for example I'd normally do a subscribe, so would I first do a subscribe and then immediately do a pn_messenger_get_link using the same address as the source used for the subscription? I'm afraid I'm a bit ignorant in all this 'cause I'd have assumed that you'd need to specify things like filters/selectors before the link attach occurs, but I wasn't clear that you had control of that in Messenger, as I say all I've done has been fairly basic things like subscribe followed by a check on incoming followed by a get. Some code (or even pseudocode) would be really useful. Frase So for me, pn_messenger_get_link was only really intended for interrogating the status of a link (e.g., checking if its ACTIVE or CLOSED) or (post-attach) for modifying the time-to-live before closing it. So e.g., pn_link_t* link = pn_messenger_get_link(messenger, address, false); // get recv link pn_terminus_set_timeout(pn_link_target(link), ttl); pn_link_close(link); Afaik the interpretation of AMQP filters is entirely broker specific? Either way I expect that the broker will expect it be sent as part of the initial @attach. You *could* try modifying the pn_data_t returned by pn_terminus_filter and then doing a pn_link_close immediately followed by a pn_link_open. I haven't tried this, but I think so long as you access and modify the link just after the initial subscribe that creates it, you shouldn't need to close/reopen it because the initial attach won't have happened yet and any filter you put on the link should get sent later on when it does happen. Alternatively extend pn_messenger_subscribe to allow you to specify a filter up-front before the first attach. Yes, or equivalently add a pn_subscription_set_filter to allow the returned subscription to be modified. It should be pretty straightforward to do either one. I haven't had time personally but I would welcome the contribution. ;-) --Rafael -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614830.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
Hi Frase, Thanks for your explanation, but here, my code: In the requester: char * corrId; . . pn_bytes_t bytes = pn_bytes(correlationId.size(), corrId); pn_atom_t id; id.type = PN_STRING; id.u.as_bytes = bytes; pn_message_set_correlation_id(message, id); and after, send it pn_messenger_put(messengerProducer, message); I see on the broker, the correlationId is correctly setted, so no pb. after I wait the answer, but I would like (like JMS) only wake up on an answer at my question (and the correlationId is here to do that) in CMS MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); But with qpid proton and messenger, if I do that: I get the response, and I does not accept it, if the correlationId recieve, is not the good one, but for me, it's not a good practice, because we retrieve the message (network traffic) in right case, but in wrong case too. So I would like to the same things like CMS, but how pn_selectable_t ?? pn_selector_t Thank you Xav -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614771.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
On 08/10/14 08:16, xavier wrote: Hi Frase, Thanks for your explanation, but here, my code: In the requester: char * corrId; . . pn_bytes_t bytes = pn_bytes(correlationId.size(), corrId); pn_atom_t id; id.type = PN_STRING; id.u.as_bytes = bytes; pn_message_set_correlation_id(message, id); and after, send it pn_messenger_put(messengerProducer, message); I see on the broker, the correlationId is correctly setted, so no pb. after I wait the answer, but I would like (like JMS) only wake up on an answer at my question (and the correlationId is here to do that) in CMS MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); But with qpid proton and messenger, if I do that: I get the response, and I does not accept it, if the correlationId recieve, is not the good one, but for me, it's not a good practice, because we retrieve the message (network traffic) in right case, but in wrong case too. So I would like to the same things like CMS, but how pn_selectable_t ?? pn_selector_t Thank you Xav -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614771.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com. Re: MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); That's exactly the bit that I've been trying to explain is the complicated bit, and the bit that Robbie referred to in his previous reply. In JMS and I guess with CMS that syntax means: | MessageConsumer http://docs.oracle.com/javaee/5/api/javax/jms/MessageConsumer.html| |*createConsumer http://docs.oracle.com/javaee/5/api/javax/jms/Session.html#createConsumer%28javax.jms.Destination,%20java.lang.String,%20boolean%29*(Destination http://docs.oracle.com/javaee/5/api/javax/jms/Destination.html destination, String http://java.sun.com/j2se/1.5/docs/api/java/lang/String.html messageSelector, boolean NoLocal)| Creates|MessageConsumer| for the specified destination, using a message selector. At an API level it's nice and simple, but there's a lot going on under the hood and what it's doing is to pass information on the AMQP link attach (I think) that configures a Message Selector (which is and AMQP filter https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/apache-filters.xml#type-selector-filter). Unfortunately (and I'd agree somewhat annoyingly!!) proton Messenger does not yet support this type of sophistication when specifying subscriptions, I wish it did myself, but unfortunately it currently does not :-( The stuff: pn_selectable_t ?? pn_selector_t Relates to something completely different I'm afraid those are actually related to low level socket selectors (i.e. more related to the select/poll/epoll system calls than Message Selectors). Their main use is for replacing Messenger's internal network event loop so it can be more easily integrated with other applications. I can't recall why you said you were using Messenger specifically, If you can use C++ (rather than C) you might want to look at the qpid::messaging API, you can definitely do the sort of thing that you want (e.g. using the broker to perform filtering based on CorrelationID) using qpid::messaging. The syntax is a little different in that I don't believe that there's a particular overloaded createConsumer method call to set a selector and you'd have to do it in the Address String - it'd look *something* like the following (I think, I've not tried it YMMV) BTW if you haven't come across it drain is a little qpid demo application that's really useful for trying out Address Strings. So if you have a queue called queue1 this should filter on messages with a given correlation ID. ./drain -b localhost -f \ queue1; {create: receiver, link: {name: test-link, selector: \correlation_id='mycid'\}} That's the sort of syntax I'd like to see supported for Messenger Subscriptions too (but as I say it currently isn't). I've copied my response to the qpid users mailing list, I tend to recommend that people post questions there as it tends to have a broader readership than the proton one. If you have to use C (as opposed to C++) then as I say Messenger won't do what you want (though clearly you could do your own filtering inside your consumer/server client), you could also get a bit more low-level and use the Engine API, which will allow the sort of thing that you want (qpid::messaging actually uses the proton Engine API under the hood), but I'm afraid that I couldn't help you make any progress using Engine as I've not got round to playing with that myself yet. Sorry I can't give you a simple answer to your question, hopefully my explanation of the reasons why is better than it was previously. In short: * You can't currently do message
Re: CorrelationId
On Wed, Oct 8, 2014 at 1:30 PM, Fraser Adams fraser.ad...@blueyonder.co.uk wrote: On 08/10/14 08:16, xavier wrote: Hi Frase, Thanks for your explanation, but here, my code: In the requester: char * corrId; . . pn_bytes_t bytes = pn_bytes(correlationId.size(), corrId); pn_atom_t id; id.type = PN_STRING; id.u.as_bytes = bytes; pn_message_set_correlation_id(message, id); and after, send it pn_messenger_put(messengerProducer, message); I see on the broker, the correlationId is correctly setted, so no pb. after I wait the answer, but I would like (like JMS) only wake up on an answer at my question (and the correlationId is here to do that) in CMS MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); But with qpid proton and messenger, if I do that: I get the response, and I does not accept it, if the correlationId recieve, is not the good one, but for me, it's not a good practice, because we retrieve the message (network traffic) in right case, but in wrong case too. So I would like to the same things like CMS, but how pn_selectable_t ?? pn_selector_t Thank you Xav -- View this message in context: http://qpid.2158936.n2.nabble. com/CorrelationId-tp7614606p7614771.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com. Re: MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); That's exactly the bit that I've been trying to explain is the complicated bit, and the bit that Robbie referred to in his previous reply. In JMS and I guess with CMS that syntax means: | MessageConsumer http://docs.oracle.com/javaee/5/api/javax/jms/ MessageConsumer.html| |*createConsumer http://docs.oracle.com/javaee/5/api/javax/jms/ Session.html#createConsumer%28javax.jms.Destination,% 20java.lang.String,%20boolean%29*(Destination http://docs.oracle.com/ javaee/5/api/javax/jms/Destination.html destination, String http://java.sun.com/j2se/1.5/ docs/api/java/lang/String.html messageSelector, boolean NoLocal)| Creates|MessageConsumer| for the specified destination, using a message selector. At an API level it's nice and simple, but there's a lot going on under the hood and what it's doing is to pass information on the AMQP link attach (I think) that configures a Message Selector (which is and AMQP filter https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/ apache-filters.xml#type-selector-filter). Unfortunately (and I'd agree somewhat annoyingly!!) proton Messenger does not yet support this type of sophistication when specifying subscriptions, I wish it did myself, but unfortunately it currently does not :-( The stuff: pn_selectable_t ?? pn_selector_t Relates to something completely different I'm afraid those are actually related to low level socket selectors (i.e. more related to the select/poll/epoll system calls than Message Selectors). Their main use is for replacing Messenger's internal network event loop so it can be more easily integrated with other applications. I can't recall why you said you were using Messenger specifically, If you can use C++ (rather than C) you might want to look at the qpid::messaging API, you can definitely do the sort of thing that you want (e.g. using the broker to perform filtering based on CorrelationID) using qpid::messaging. The syntax is a little different in that I don't believe that there's a particular overloaded createConsumer method call to set a selector and you'd have to do it in the Address String - it'd look *something* like the following (I think, I've not tried it YMMV) BTW if you haven't come across it drain is a little qpid demo application that's really useful for trying out Address Strings. So if you have a queue called queue1 this should filter on messages with a given correlation ID. ./drain -b localhost -f \ queue1; {create: receiver, link: {name: test-link, selector: \correlation_id='mycid'\}} That's the sort of syntax I'd like to see supported for Messenger Subscriptions too (but as I say it currently isn't). I've copied my response to the qpid users mailing list, I tend to recommend that people post questions there as it tends to have a broader readership than the proton one. If you have to use C (as opposed to C++) then as I say Messenger won't do what you want (though clearly you could do your own filtering inside your consumer/server client), you could also get a bit more low-level and use the Engine API, which will allow the sort of thing that you want (qpid::messaging actually uses the proton Engine API under the hood), but I'm afraid that I couldn't help you make any progress using Engine as I've not got round to playing with that myself yet. Sorry I can't give you a simple
Re: CorrelationId
On 08/10/14 18:49, Rafael Schloming wrote: On Wed, Oct 8, 2014 at 1:30 PM, Fraser Adams fraser.ad...@blueyonder.co.uk wrote: On 08/10/14 08:16, xavier wrote: Hi Frase, Thanks for your explanation, but here, my code: In the requester: char * corrId; . . pn_bytes_t bytes = pn_bytes(correlationId.size(), corrId); pn_atom_t id; id.type = PN_STRING; id.u.as_bytes = bytes; pn_message_set_correlation_id(message, id); and after, send it pn_messenger_put(messengerProducer, message); I see on the broker, the correlationId is correctly setted, so no pb. after I wait the answer, but I would like (like JMS) only wake up on an answer at my question (and the correlationId is here to do that) in CMS MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); But with qpid proton and messenger, if I do that: I get the response, and I does not accept it, if the correlationId recieve, is not the good one, but for me, it's not a good practice, because we retrieve the message (network traffic) in right case, but in wrong case too. So I would like to the same things like CMS, but how pn_selectable_t ?? pn_selector_t Thank you Xav -- View this message in context: http://qpid.2158936.n2.nabble. com/CorrelationId-tp7614606p7614771.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com. Re: MessageConsumer* consumer = session-createConsumer(destination, JMSCorrelationID=' + correlationId + '); consumer-start(); That's exactly the bit that I've been trying to explain is the complicated bit, and the bit that Robbie referred to in his previous reply. In JMS and I guess with CMS that syntax means: | MessageConsumer http://docs.oracle.com/javaee/5/api/javax/jms/ MessageConsumer.html| |*createConsumer http://docs.oracle.com/javaee/5/api/javax/jms/ Session.html#createConsumer%28javax.jms.Destination,% 20java.lang.String,%20boolean%29*(Destination http://docs.oracle.com/ javaee/5/api/javax/jms/Destination.html destination, String http://java.sun.com/j2se/1.5/ docs/api/java/lang/String.html messageSelector, boolean NoLocal)| Creates|MessageConsumer| for the specified destination, using a message selector. At an API level it's nice and simple, but there's a lot going on under the hood and what it's doing is to pass information on the AMQP link attach (I think) that configures a Message Selector (which is and AMQP filter https://svn.apache.org/repos/asf/qpid/trunk/qpid/specs/ apache-filters.xml#type-selector-filter). Unfortunately (and I'd agree somewhat annoyingly!!) proton Messenger does not yet support this type of sophistication when specifying subscriptions, I wish it did myself, but unfortunately it currently does not :-( The stuff: pn_selectable_t ?? pn_selector_t Relates to something completely different I'm afraid those are actually related to low level socket selectors (i.e. more related to the select/poll/epoll system calls than Message Selectors). Their main use is for replacing Messenger's internal network event loop so it can be more easily integrated with other applications. I can't recall why you said you were using Messenger specifically, If you can use C++ (rather than C) you might want to look at the qpid::messaging API, you can definitely do the sort of thing that you want (e.g. using the broker to perform filtering based on CorrelationID) using qpid::messaging. The syntax is a little different in that I don't believe that there's a particular overloaded createConsumer method call to set a selector and you'd have to do it in the Address String - it'd look *something* like the following (I think, I've not tried it YMMV) BTW if you haven't come across it drain is a little qpid demo application that's really useful for trying out Address Strings. So if you have a queue called queue1 this should filter on messages with a given correlation ID. ./drain -b localhost -f \ queue1; {create: receiver, link: {name: test-link, selector: \correlation_id='mycid'\}} That's the sort of syntax I'd like to see supported for Messenger Subscriptions too (but as I say it currently isn't). I've copied my response to the qpid users mailing list, I tend to recommend that people post questions there as it tends to have a broader readership than the proton one. If you have to use C (as opposed to C++) then as I say Messenger won't do what you want (though clearly you could do your own filtering inside your consumer/server client), you could also get a bit more low-level and use the Engine API, which will allow the sort of thing that you want (qpid::messaging actually uses the proton Engine API under the hood), but I'm afraid that I couldn't help you make any progress using Engine as I've not got round to playing with that myself yet. Sorry I can't give you a simple answer to your question, hopefully my
Re: CorrelationId
Hi all, Thanks you for your reply, help? So if I understand properly, to do that with messenger is not trivial??? But I don't understand why, because with the pattern request/reply in asynchronous mode, and with dedicated queue, we must have this feature. isn't it? So to do that must I take some temporally queue? or using selector (in messenger), but I don't know (with the documentation), how it's works? is there any sample? Thanks a lot for your help Regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614740.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
Re: CorrelationId
On 07/10/14 08:04, xavier wrote: Hi all, Thanks you for your reply, help? So if I understand properly, to do that with messenger is not trivial??? But I don't understand why, because with the pattern request/reply in asynchronous mode, and with dedicated queue, we must have this feature. isn't it? So to do that must I take some temporally queue? or using selector (in messenger), but I don't know (with the documentation), how it's works? is there any sample? Thanks a lot for your help Regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606p7614740.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com. To be clear I didn't say that it wasn't possible to set a correlation ID with messenger, what I said was that there is no mechanism to establish non-trivial link configuration, which means doing things like message selectors, or even doing topic style subscriptions to a topic exchange on a broker isn't possible. If all you want to do is a basic request/response, for example sending a request to a named queue that has an application receiving from it then have the application respond looping back the correlation ID and using the reply-to then that sort of thing is perfectly possible. To set the correlation ID you need to look at the messenger API documentation specifically the method is: pn_message_correlation_id It's slightly convoluted though as that returns a pn_data object that is owned by the pn_message object so then you have to add or retrieve the actual ID to the pn_data object (it's more of a faff than doing something like setting an address on the message for example). so given the pn data representing the correlation ID if you wanted to add an ID that is a ulong you'd do. pn_data_put_ulong(cid, someulong) If you wanted a string correlation id you have to do something like pn_data_put_string(cid, pn_bytes(strlen(text), text)); retrieving the correlation ID is the reverse. I think that in your original question you mentioned filtering using the correlation ID? That was where the assumption about message selectors came in, if you want the infrastructure e.g. the broker to do the filtering then you can't using messenger, for the reasons that I mentioned about not being able to specify things like amqp filters on the link set up, you can of course filter more manually in the application receiving your request messages. I'm afraid that most of my experience with Messenger is actually in the JavaScript binding that I'm writing, that makes things quite easy as it provides nice OO abstractions so I can just do message.setCorrelationID(); and pass in a JavaScript number, string or a UUID or Binary (the latter two are classes that I wrote as part of the binding). Unfortunately that means that I don't have any decent code snippets showing a complete example in C I've done asynchronous request/response using correlation ID - I basically wrote a port of qpid-config in JavaScript, which is all request/response, so as I say that much is *definitely* possible. That example sends messages to a named node - specifically host:port/qmf.default.direct which is actually an exchange on the broker, it sends its responses to host:port/# well that address when used to create a subscription results in a dynamic queue being created on the broker, you have to do pn_subscription_address to get the name of the new queue. HTH, Frase
Re: CorrelationId
ActiveMQ currently uses an AMQP filter to specify the selector string the broker should use for the consumer. I dont believe Messenger currently supports specifying filters, however I have next to no experience with Messenger so perhaps someone with more knowledge can comment. Robbie On 2 October 2014 15:44, xavier xaviermillie...@eaton.com wrote: Hi all, I am new with Qpid Proton So I try to using it with ActiveMq, and migrate my old code based on CMS (JMS for C) ! I would like to get a message filtered on correlationId, like with CMS: session-createConsumer(destination, JMSCorrelationID=' + correlationId + ') But with Qpid proton (proton/messenger.h) how can I do that??? Any help will be appreciate Regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.
CorrelationId
Hi all, I am new with Qpid Proton So I try to using it with ActiveMq, and migrate my old code based on CMS (JMS for C) ! I would like to get a message filtered on correlationId, like with CMS: session-createConsumer(destination, JMSCorrelationID=' + correlationId + ') But with Qpid proton (proton/messenger.h) how can I do that??? Any help will be appreciate Regards -- View this message in context: http://qpid.2158936.n2.nabble.com/CorrelationId-tp7614606.html Sent from the Apache Qpid Proton mailing list archive at Nabble.com.