Hi Fraser,
----- Original Message -----
> From: "Fraser Adams" <[email protected]>
> To: [email protected]
> Sent: Sunday, November 17, 2013 7:04:53 AM
> Subject: using proton messenger *asynchronously*
>
> Hey all,
> I've been messing around trying to get messenger to do something in
> asynchronous mode. The comments here:
> http://qpid.apache.org/components/messenger/book/sending-and-receiving.html
>
> Suggest that it's possible - though the doc there probably needs an edit
> because |pn_messenger_send(messenger) |isn't legal - it needs a second
> parameter, the actual prototype is:
> /*
> * @param[in] messenger the messager
> * @param[in] n the number of messages to send
> *
> * @return an error code or zero on success
> * @see error.h
> */
> PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n);
>
> But that's by the by:
>
>
> When I started playing with this I eventually figured out that I needed
> to have:
> pn_messenger_set_blocking(messenger, false);
>
> After I've created a messenger, as well as doing:
> pn_messenger_send(messenger, 0);
>
>
> But it all seems a lot more cryptic than that. Under the hood there's a
> call to pn_messenger_tsync, which in blocking mode eventually ends up
> blocking on a poll and has a while loop internally. Now that tsync call
> seems to do a whole bunch of things actually leading to sends and
> receives of the sockets, so it seems just calling pn_messenger_send in
> non-blocking mode isn't enough and I need to call
> pn_messenger_work(messenger, 0); too.
>
You're correct here - it appears that the web page you reference hasn't been
updated since Rafi added the non-blocking stuff. You definitely need to call
pn_messenger_work() in order to have data hit the wire.
> I've attached some *really hacky* code. The code is pretty rubbishy, in
> the case of send it's calling pn_messenger_send then calling a main loop
> that does pn_messenger_work(messenger, 0); periodically. The select to
> sleep every second is a massive hack, and I've tried different values of
> sleep - ultimately I'd want it to be a proper asynchronous application.
>
>
> As far as I can see using the send and recv I've attached things get as
> far as completing the AMQP negotiation, but the actual message isn't
> being sent and it *looks like* something to do with credit.
>
> I've hacked around in proton "instrumenting" it - OK I've added a bunch
> of printfs :-)
>
> Hands up who knows that code best? fancy volunteering to document the
> call graph, trying to work my way around that made my *head explode* ;-D
> I'm sure it's easy when you know, but trying to figure the path of the
> actual message.........
>
>
> I *think* that there's some sort of "store" where pn_messenger_put
> actually puts the message and at some point the message should be
> retrieved from there by the transport.
>
> Looking through transport.c I've added printfs to
> "pn_output_write_amqp_header" and "pn_output_write_amqp" and can see
> those getting called and pn_process(transport) seems to be getting
> called too.
>
> Looking at where I'm at; the message is being put into the store
>
> pn_messenger_put
> calling pn_buffer_append 93
> 0 83 112 -48 0 0 0 11 0 0 0 5 66 80 4 64 66 82 0 0 83 115 -48 0 0 0 49 0
> 0 0 13 64 64 -95 14 97 109 113 112 58 47 47 48 46 48 46 48 46 48 64 64
> 64 64 64 -125 0 0 0 0 0 0 0 0 -125 0 0 0 0 0 0 0 0 64 82 0 64 0 83 119
> -95 12 72 101 108 108 111 32 87 111 114 108 100 33
>
> So that's the "Hello World!" at the end of the buffer :-) but when I
> print out the values needed for the test in pn_process_tpwork_sender I
> keep getting:
>
> pn_process_tpwork_sender state->sent = 0 delivery->done = 1 93
> ssn_state->remote_incoming_window = 0
> link_state->link_credit = 0
>
>
> This *seems* to be saying that the message is still in the store but
> remote_incoming_window and link_credit are zero. That basically prints
> out every time I call pn_messenger_work(messenger, 0);
>
>
> There's clearly something I'm missing? There wasn't anything in the
> synchronous send/recv that explicitly set credits though? I've spent
> most of the weekend tearing my hair out on this, so I'd be grateful for
> any ideas.
>
You're correct - it does come down to credit. In your recv.c code, the
main_loop needs to supply a credit count to the pn_messenger_recv() call:
// XXX pn_messenger_recv(messenger, 0);
pn_messenger_recv(messenger, 1);
This will cause the receiver to send a "flow" frame.
All in all, your code is pretty close - I've made a few tweaks (attached). I
think the big difference is the call to pn_messenger_work() on the sender - you
need to call that until the outgoing message has been sent
(pn_messenger_outgoing() == 0).
And from what I can tell - again, not documented - you need to keep calling
pn_messenger_work() until it returns a non-zero value. This is inconsistent
with the Java variant (which returns 0 when there's no more work to do) - I've
got a proposed fix pending (https://reviews.apache.org/r/15458/diff/#5) for 0.6.
As you've noticed the docs tend to not keep up with the code changes, and that
makes things difficult to understand. Usually the header files are more up to
date. Personally, I've learned most from reading the python unit tests
(tests/python/proton_tests/*).
>
> I was serious about the call graph documentation BTW, being pretty new
> to the code base it was pretty daunting following the various levels of
> indirection and I'm still nowhere near actually understanding how it
> works, how do you guys find your way around when it comes to debugging -
> is it just experience or are you using other debug tools?
>
> Cheers,
> Frase
>
BTW, if you set an environment variable called "PN_TRACE_FRM" to one:
$ export PN_TRACE_FRM=1
proton will dump a trace of the traffic "on the wire" when you run your
programs. Doing that, I could see that the code was getting "stuck" at the
point where I would think recv should issue a flow frame.
>
>
>
>
>
>
>
>
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: [email protected]
> For additional commands, e-mail: [email protected]
--
-K
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#include <unistd.h>
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make these global
pn_message_t * message;
pn_messenger_t * messenger;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: recv [options] <addr>\n");
printf("-c \tPath to the certificate file.\n");
printf("-k \tPath to the private key file.\n");
printf("-p \tPassword for the private key.\n");
printf("<addr>\tAn address.\n");
exit(0);
}
void main_loop(void *arg) {
//pn_messenger_recv(messenger, 1024);
// XXX pn_messenger_recv(messenger, 0);
pn_messenger_recv(messenger, 1);
check(messenger);
while(pn_messenger_incoming(messenger))
{
printf("pn_messenger_incoming\n");
pn_messenger_get(messenger, message);
check(messenger);
char buffer[1024];
size_t buffsize = sizeof(buffer);
pn_data_t *body = pn_message_body(message);
pn_data_format(body, buffer, &buffsize);
printf("Address: %s\n", pn_message_get_address(message));
const char* subject = pn_message_get_subject(message);
printf("Subject: %s\n", subject ? subject : "(no subject)");
printf("Content: %s\n", buffer);
}
}
int main(int argc, char** argv)
{
char* certificate = NULL;
char* privatekey = NULL;
char* password = NULL;
char* address = (char *) "amqp://~0.0.0.0";
int c;
opterr = 0;
while((c = getopt(argc, argv, "hc:k:p:")) != -1)
{
switch(c)
{
case 'h':
usage();
break;
case 'c': certificate = optarg; break;
case 'k': privatekey = optarg; break;
case 'p': password = optarg; break;
case '?':
if(optopt == 'c' ||
optopt == 'k' ||
optopt == 'p')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc)
{
address = argv[optind];
}
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
/* load the various command line options if they're set */
if(certificate)
{
pn_messenger_set_certificate(messenger, certificate);
}
if(privatekey)
{
pn_messenger_set_private_key(messenger, privatekey);
}
if(password)
{
pn_messenger_set_password(messenger, password);
}
pn_messenger_start(messenger);
check(messenger);
pn_messenger_subscribe(messenger, address);
check(messenger);
while (1) {
main_loop(NULL);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 16667;
select(0, NULL, NULL, NULL, &timeout);
}
return 0;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#include <unistd.h>
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make global
pn_message_t * message;
pn_messenger_t * messenger;
int count = 0;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: send [-a addr] [message]\n");
printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
printf("message\tA text string to send.\n");
exit(0);
}
void main_loop(void *arg) {
printf(" *** main_loop *** \n");
int err = pn_messenger_work(messenger, 0);
while (err == 0)
err = pn_messenger_work(messenger, 0);
//count++;
//if (count % 10 == 0) {
//exit(1);
}
int main(int argc, char** argv)
{
int c;
opterr = 0;
char * address = (char *) "amqp://0.0.0.0";
char * msgtext = (char *) "Hello World!";
while((c = getopt(argc, argv, "ha:b:c:")) != -1)
{
switch(c)
{
case 'a': address = optarg; break;
case 'h': usage(); break;
case '?':
if(optopt == 'a')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc) msgtext = argv[optind];
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
pn_messenger_start(messenger);
pn_message_set_address(message, address);
pn_data_t *body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
//pn_messenger_send(messenger, -1);
pn_messenger_send(messenger, 0); // FA Addition.
check(messenger);
while (pn_messenger_outgoing(messenger)) {
main_loop(NULL);
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
}
pn_messenger_stop(messenger);
while (!pn_messenger_work(messenger, 0))
;
pn_messenger_free(messenger);
pn_message_free(message);
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]