Hey all,
I've been messing around trying to get messenger to do something in
asynchronous mode. The comments here:
http://qpid.apache.org/components/messenger/book/sending-and-receiving.html
Suggest that it's possible - though the doc there probably needs an edit
because |pn_messenger_send(messenger) |isn't legal - it needs a second
parameter, the actual prototype is:
/*
* @param[in] messenger the messager
* @param[in] n the number of messages to send
*
* @return an error code or zero on success
* @see error.h
*/
PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n);
But that's by the by:
When I started playing with this I eventually figured out that I needed
to have:
pn_messenger_set_blocking(messenger, false);
After I've created a messenger, as well as doing:
pn_messenger_send(messenger, 0);
But it all seems a lot more cryptic than that. Under the hood there's a
call to pn_messenger_tsync, which in blocking mode eventually ends up
blocking on a poll and has a while loop internally. Now that tsync call
seems to do a whole bunch of things actually leading to sends and
receives of the sockets, so it seems just calling pn_messenger_send in
non-blocking mode isn't enough and I need to call
pn_messenger_work(messenger, 0); too.
I've attached some *really hacky* code. The code is pretty rubbishy, in
the case of send it's calling pn_messenger_send then calling a main loop
that does pn_messenger_work(messenger, 0); periodically. The select to
sleep every second is a massive hack, and I've tried different values of
sleep - ultimately I'd want it to be a proper asynchronous application.
As far as I can see using the send and recv I've attached things get as
far as completing the AMQP negotiation, but the actual message isn't
being sent and it *looks like* something to do with credit.
I've hacked around in proton "instrumenting" it - OK I've added a bunch
of printfs :-)
Hands up who knows that code best? fancy volunteering to document the
call graph, trying to work my way around that made my *head explode* ;-D
I'm sure it's easy when you know, but trying to figure the path of the
actual message.........
I *think* that there's some sort of "store" where pn_messenger_put
actually puts the message and at some point the message should be
retrieved from there by the transport.
Looking through transport.c I've added printfs to
"pn_output_write_amqp_header" and "pn_output_write_amqp" and can see
those getting called and pn_process(transport) seems to be getting
called too.
Looking at where I'm at; the message is being put into the store
pn_messenger_put
calling pn_buffer_append 93
0 83 112 -48 0 0 0 11 0 0 0 5 66 80 4 64 66 82 0 0 83 115 -48 0 0 0 49 0
0 0 13 64 64 -95 14 97 109 113 112 58 47 47 48 46 48 46 48 46 48 64 64
64 64 64 -125 0 0 0 0 0 0 0 0 -125 0 0 0 0 0 0 0 0 64 82 0 64 0 83 119
-95 12 72 101 108 108 111 32 87 111 114 108 100 33
So that's the "Hello World!" at the end of the buffer :-) but when I
print out the values needed for the test in pn_process_tpwork_sender I
keep getting:
pn_process_tpwork_sender state->sent = 0 delivery->done = 1 93
ssn_state->remote_incoming_window = 0
link_state->link_credit = 0
This *seems* to be saying that the message is still in the store but
remote_incoming_window and link_credit are zero. That basically prints
out every time I call pn_messenger_work(messenger, 0);
There's clearly something I'm missing? There wasn't anything in the
synchronous send/recv that explicitly set credits though? I've spent
most of the weekend tearing my hair out on this, so I'd be grateful for
any ideas.
I was serious about the call graph documentation BTW, being pretty new
to the code base it was pretty daunting following the various levels of
indirection and I'm still nowhere near actually understanding how it
works, how do you guys find your way around when it comes to debugging -
is it just experience or are you using other debug tools?
Cheers,
Frase
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make these global
pn_message_t * message;
pn_messenger_t * messenger;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: recv [options] <addr>\n");
printf("-c \tPath to the certificate file.\n");
printf("-k \tPath to the private key file.\n");
printf("-p \tPassword for the private key.\n");
printf("<addr>\tAn address.\n");
exit(0);
}
void main_loop(void *arg) {
//pn_messenger_recv(messenger, 1024);
pn_messenger_recv(messenger, 0);
check(messenger);
while(pn_messenger_incoming(messenger))
{
printf("pn_messenger_incoming\n");
pn_messenger_get(messenger, message);
check(messenger);
char buffer[1024];
size_t buffsize = sizeof(buffer);
pn_data_t *body = pn_message_body(message);
pn_data_format(body, buffer, &buffsize);
printf("Address: %s\n", pn_message_get_address(message));
const char* subject = pn_message_get_subject(message);
printf("Subject: %s\n", subject ? subject : "(no subject)");
printf("Content: %s\n", buffer);
}
}
int main(int argc, char** argv)
{
char* certificate = NULL;
char* privatekey = NULL;
char* password = NULL;
char* address = (char *) "amqp://~0.0.0.0";
int c;
opterr = 0;
while((c = getopt(argc, argv, "hc:k:p:")) != -1)
{
switch(c)
{
case 'h':
usage();
break;
case 'c': certificate = optarg; break;
case 'k': privatekey = optarg; break;
case 'p': password = optarg; break;
case '?':
if(optopt == 'c' ||
optopt == 'k' ||
optopt == 'p')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc)
{
address = argv[optind];
}
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
/* load the various command line options if they're set */
if(certificate)
{
pn_messenger_set_certificate(messenger, certificate);
}
if(privatekey)
{
pn_messenger_set_private_key(messenger, privatekey);
}
if(password)
{
pn_messenger_set_password(messenger, password);
}
pn_messenger_start(messenger);
check(messenger);
pn_messenger_subscribe(messenger, address);
check(messenger);
while (1) {
main_loop(NULL);
struct timeval timeout;
timeout.tv_sec = 0;
timeout.tv_usec = 16667;
select(0, NULL, NULL, NULL, &timeout);
}
return 0;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make global
pn_message_t * message;
pn_messenger_t * messenger;
int count = 0;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: send [-a addr] [message]\n");
printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
printf("message\tA text string to send.\n");
exit(0);
}
void main_loop(void *arg) {
printf(" *** main_loop ***\n");
pn_messenger_work(messenger, 0); // FA Addition.
count++;
if (count % 10 == 0) {
exit(1);
}
}
int main(int argc, char** argv)
{
int c;
opterr = 0;
char * address = (char *) "amqp://0.0.0.0";
char * msgtext = (char *) "Hello World!";
while((c = getopt(argc, argv, "ha:b:c:")) != -1)
{
switch(c)
{
case 'a': address = optarg; break;
case 'h': usage(); break;
case '?':
if(optopt == 'a')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc) msgtext = argv[optind];
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
pn_messenger_start(messenger);
pn_message_set_address(message, address);
pn_data_t *body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
//pn_messenger_send(messenger, -1);
pn_messenger_send(messenger, 0); // FA Addition.
check(messenger);
while (1) {
main_loop(NULL);
struct timeval timeout;
timeout.tv_sec = 1;
timeout.tv_usec = 0;
select(0, NULL, NULL, NULL, &timeout);
}
/*
pn_messenger_stop(messenger);
pn_messenger_free(messenger);
pn_message_free(message);
*/
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]