Hey all,
I've been messing around trying to get messenger to do something in asynchronous mode. The comments here:
http://qpid.apache.org/components/messenger/book/sending-and-receiving.html

Suggest that it's possible - though the doc there probably needs an edit because |pn_messenger_send(messenger) |isn't legal - it needs a second parameter, the actual prototype is:
/*
 * @param[in] messenger the messager
 * @param[in] n the number of messages to send
 *
 * @return an error code or zero on success
 * @see error.h
 */
PN_EXTERN int pn_messenger_send(pn_messenger_t *messenger, int n);

But that's by the by:


When I started playing with this I eventually figured out that I needed to have:
pn_messenger_set_blocking(messenger, false);

After I've created a messenger, as well as doing:
pn_messenger_send(messenger, 0);


But it all seems a lot more cryptic than that. Under the hood there's a call to pn_messenger_tsync, which in blocking mode eventually ends up blocking on a poll and has a while loop internally. Now that tsync call seems to do a whole bunch of things actually leading to sends and receives of the sockets, so it seems just calling pn_messenger_send in non-blocking mode isn't enough and I need to call pn_messenger_work(messenger, 0); too.

I've attached some *really hacky* code. The code is pretty rubbishy, in the case of send it's calling pn_messenger_send then calling a main loop that does pn_messenger_work(messenger, 0); periodically. The select to sleep every second is a massive hack, and I've tried different values of sleep - ultimately I'd want it to be a proper asynchronous application.


As far as I can see using the send and recv I've attached things get as far as completing the AMQP negotiation, but the actual message isn't being sent and it *looks like* something to do with credit.

I've hacked around in proton "instrumenting" it - OK I've added a bunch of printfs :-)

Hands up who knows that code best? fancy volunteering to document the call graph, trying to work my way around that made my *head explode* ;-D I'm sure it's easy when you know, but trying to figure the path of the actual message.........


I *think* that there's some sort of "store" where pn_messenger_put actually puts the message and at some point the message should be retrieved from there by the transport.

Looking through transport.c I've added printfs to "pn_output_write_amqp_header" and "pn_output_write_amqp" and can see those getting called and pn_process(transport) seems to be getting called too.

Looking at where I'm at; the message is being put into the store

pn_messenger_put
calling pn_buffer_append 93
0 83 112 -48 0 0 0 11 0 0 0 5 66 80 4 64 66 82 0 0 83 115 -48 0 0 0 49 0 0 0 13 64 64 -95 14 97 109 113 112 58 47 47 48 46 48 46 48 46 48 64 64 64 64 64 -125 0 0 0 0 0 0 0 0 -125 0 0 0 0 0 0 0 0 64 82 0 64 0 83 119 -95 12 72 101 108 108 111 32 87 111 114 108 100 33

So that's the "Hello World!" at the end of the buffer :-) but when I print out the values needed for the test in pn_process_tpwork_sender I keep getting:

pn_process_tpwork_sender state->sent = 0 delivery->done = 1 93
ssn_state->remote_incoming_window = 0
link_state->link_credit = 0


This *seems* to be saying that the message is still in the store but remote_incoming_window and link_credit are zero. That basically prints out every time I call pn_messenger_work(messenger, 0);


There's clearly something I'm missing? There wasn't anything in the synchronous send/recv that explicitly set credits though? I've spent most of the weekend tearing my hair out on this, so I'd be grateful for any ideas.


I was serious about the call graph documentation BTW, being pretty new to the code base it was pretty daunting following the various levels of indirection and I'm still nowhere near actually understanding how it works, how do you guys find your way around when it comes to debugging - is it just experience or are you using other debug tools?

Cheers,
Frase









/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "proton/message.h"
#include "proton/messenger.h"

#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>


#define check(messenger)                                                     \
  {                                                                          \
    if(pn_messenger_errno(messenger))                                        \
    {                                                                        \
      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
    }                                                                        \
  }                                                                          \

// FA Temporarily make these global
  pn_message_t * message;
  pn_messenger_t * messenger;

void die(const char *file, int line, const char *message)
{
  fprintf(stderr, "%s:%i: %s\n", file, line, message);
  exit(1);
}

void usage()
{
  printf("Usage: recv [options] <addr>\n");
  printf("-c    \tPath to the certificate file.\n");
  printf("-k    \tPath to the private key file.\n");
  printf("-p    \tPassword for the private key.\n");
  printf("<addr>\tAn address.\n");
  exit(0);
}

void main_loop(void *arg) {

    //pn_messenger_recv(messenger, 1024);
    pn_messenger_recv(messenger, 0);
    check(messenger);

    while(pn_messenger_incoming(messenger))
    {
printf("pn_messenger_incoming\n");
      pn_messenger_get(messenger, message);
      check(messenger);

      char buffer[1024];
      size_t buffsize = sizeof(buffer);
      pn_data_t *body = pn_message_body(message);
      pn_data_format(body, buffer, &buffsize);

      printf("Address: %s\n", pn_message_get_address(message));
      const char* subject = pn_message_get_subject(message);
      printf("Subject: %s\n", subject ? subject : "(no subject)");
      printf("Content: %s\n", buffer);
    }

}

int main(int argc, char** argv)
{
  char* certificate = NULL;
  char* privatekey = NULL;
  char* password = NULL;
  char* address = (char *) "amqp://~0.0.0.0";
  int c;
  opterr = 0;

  while((c = getopt(argc, argv, "hc:k:p:")) != -1)
  {
    switch(c)
    {
    case 'h':
      usage();
      break;

    case 'c': certificate = optarg; break;
    case 'k': privatekey = optarg; break;
    case 'p': password = optarg; break;

    case '?':
      if(optopt == 'c' ||
         optopt == 'k' ||
         optopt == 'p')
      {
        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
      }
      else if(isprint(optopt))
      {
        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
      }
      else
      {
        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
      }
      return 1;
    default:
      abort();
    }
  }

  if (optind < argc)
  {
    address = argv[optind];
  }

//  pn_message_t * message;
//  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.

  /* load the various command line options if they're set */
  if(certificate)
  {
    pn_messenger_set_certificate(messenger, certificate);
  }

  if(privatekey)
  {
    pn_messenger_set_private_key(messenger, privatekey);
  }

  if(password)
  {
    pn_messenger_set_password(messenger, password);
  }

  pn_messenger_start(messenger);
  check(messenger);

  pn_messenger_subscribe(messenger, address);
  check(messenger);


  while (1) {
    main_loop(NULL);

    struct timeval timeout;
    timeout.tv_sec = 0;
    timeout.tv_usec = 16667;
    select(0, NULL, NULL, NULL, &timeout);
  }

  return 0;
}
/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "proton/message.h"
#include "proton/messenger.h"

#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>

#define check(messenger)                                                     \
  {                                                                          \
    if(pn_messenger_errno(messenger))                                        \
    {                                                                        \
      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
    }                                                                        \
  }                                                                          \

// FA Temporarily make global
  pn_message_t * message;
  pn_messenger_t * messenger;
int count = 0;

void die(const char *file, int line, const char *message)
{
  fprintf(stderr, "%s:%i: %s\n", file, line, message);
  exit(1);
}

void usage()
{
  printf("Usage: send [-a addr] [message]\n");
  printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
  printf("message\tA text string to send.\n");
  exit(0);
}


void main_loop(void *arg) {
printf("                          *** main_loop ***\n");

    pn_messenger_work(messenger, 0); // FA Addition.


count++;
if (count % 10 == 0) {
exit(1);
}


}


int main(int argc, char** argv)
{
  int c;
  opterr = 0;
  char * address = (char *) "amqp://0.0.0.0";
  char * msgtext = (char *) "Hello World!";

  while((c = getopt(argc, argv, "ha:b:c:")) != -1)
  {
    switch(c)
    {
    case 'a': address = optarg; break;
    case 'h': usage(); break;

    case '?':
      if(optopt == 'a')
      {
        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
      }
      else if(isprint(optopt))
      {
        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
      }
      else
      {
        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
      }
      return 1;
    default:
      abort();
    }
  }

  if (optind < argc) msgtext = argv[optind];

//  pn_message_t * message;
//  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.

  pn_messenger_start(messenger);

  pn_message_set_address(message, address);
  pn_data_t *body = pn_message_body(message);
  pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));

  pn_messenger_put(messenger, message);
  check(messenger);

  //pn_messenger_send(messenger, -1);
  pn_messenger_send(messenger, 0); // FA Addition.
  check(messenger);


  while (1) {
    main_loop(NULL);

    struct timeval timeout;
    timeout.tv_sec = 1;
    timeout.tv_usec = 0;
    select(0, NULL, NULL, NULL, &timeout);
  }


/*
  pn_messenger_stop(messenger);
  pn_messenger_free(messenger);
  pn_message_free(message);
*/

  return 0;
}

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to