Hey folks,
I've noticed that PROTON-525/531/534 cover work to expose some bits of messenger that were previously internal and allow messenger to be driven from an external poll/select/epoll.

I'm quite interested in this from the perspective of the JavaScript bindings that I'm working on, but to be honest I'm currently left scratching my head trying to figure out how the new APIs are intended to work.

I don't suppose that there are any examples available?


I currently have a recv-async.c and send-async.c (attached) they are still a bit hacky as they are currently work in progress as I push the necessary features into emscripten (the C->JavaScript compiler I'm using) but they both work in either native C or JavaScript (the emscripten_set_network_callback gets triggered by WebSocket activity and allows fully async behaviour, so I don't need any nasty polling).

I've just merged the latest proton-c stuff to the branch I'm working on for the JavaScript bindings and everything is still working nicely with the current approach, but I'm guessing that the new capabilities might be able to make things more efficient?

I'm currently working on actual binding code, so I can call messenger direct from native JavaScript as opposed to compiling C/C++ into JavaScript, so far it has got a lot of parallels with the python bindings - though clearly only async stuff makes any sense for JavaScript.


I'd really appreciate tips and code samples from the folks who have been working on this API.

Cheers,
Frase


/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/driver.h"

#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>

#if EMSCRIPTEN
#include <emscripten.h>
void emscripten_set_network_callback(void (*func)());
#endif

#define check(messenger)                                                     \
  {                                                                          \
    if(pn_messenger_errno(messenger))                                        \
    {                                                                        \
      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
    }                                                                        \
  }                                                                          \

// FA Temporarily make global
  pn_message_t * message;
  pn_messenger_t * messenger;

pn_tracker_t tracker;
int tracked = 1;

int running = 1;


void die(const char *file, int line, const char *message)
{
  fprintf(stderr, "%s:%i: %s\n", file, line, message);
  exit(1);
}

void usage(void)
{
  printf("Usage: send [-a addr] [message]\n");
  printf("-a     \tThe target address [amqp[s]://domain[/name]]\n");
  printf("message\tA text string to send.\n");
  exit(0);
}

void process(void) {
//printf("                          *** process ***\n");

    // Process outgoing messages

    pn_status_t status = pn_messenger_status(messenger, tracker);
//printf("status = %d\n", status);

    if (status != PN_STATUS_PENDING) {
printf("status = %d\n", status);

        //pn_messenger_settle(messenger, tracker, 0);
        //tracked--;

        if (running) {
printf("stopping\n");
            pn_messenger_stop(messenger);
            running = 0;
        } 
    }

    if (pn_messenger_stopped(messenger)) {
printf("exiting\n");
        pn_message_free(message);
        pn_messenger_free(messenger);
        exit(0);
    }
}



// Callback used by emscripten to ensure pn_messenger_work gets called.
void work(void) {
//printf("                          *** work ***\n");

    int err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);

    if (err >= 0) {
        process();
    }

    err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);

    if (err >= 0) {
        process();
    }
}

int main(int argc, char** argv)
{
  int c;
  opterr = 0;
  char * address = (char *) "amqp://0.0.0.0";
  char * msgtext = (char *) "Hello World!";

  while((c = getopt(argc, argv, "ha:b:c:")) != -1)
  {
    switch(c)
    {
    case 'a': address = optarg; break;
    case 'h': usage(); break;

    case '?':
      if(optopt == 'a')
      {
        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
      }
      else if(isprint(optopt))
      {
        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
      }
      else
      {
        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
      }
      return 1;
    default:
      abort();
    }
  }

  if (optind < argc) msgtext = argv[optind];

//  pn_message_t * message;
//  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);
  pn_messenger_set_blocking(messenger, false); // Put messenger into non-blocking mode.


  pn_messenger_set_outgoing_window(messenger, 1024); // FA Addition.




  pn_messenger_start(messenger);

  pn_message_set_address(message, address);
  pn_data_t *body = pn_message_body(message);
  pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));

  pn_messenger_put(messenger, message);
  check(messenger);

  tracker = pn_messenger_outgoing_tracker(messenger);
//printf("tracker = %lld\n", (long long int)tracker);


#if EMSCRIPTEN
  //emscripten_set_main_loop(work, 0, 0);

  emscripten_set_network_callback(work);
#else
  while (1) {
    pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
    process();
  }
#endif

  return 0;
}

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 *
 */

#include "proton/message.h"
#include "proton/messenger.h"

#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>

#if EMSCRIPTEN
#include <emscripten.h>
void emscripten_set_network_callback(void (*func)());
#endif

#define check(messenger)                                                     \
  {                                                                          \
    if(pn_messenger_errno(messenger))                                        \
    {                                                                        \
      die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
    }                                                                        \
  }                                                                          \

// FA Temporarily make these global
  pn_message_t * message;
  pn_messenger_t * messenger;

void die(const char *file, int line, const char *message)
{
  fprintf(stderr, "%s:%i: %s\n", file, line, message);
  exit(1);
}

void usage(void)
{
  printf("Usage: recv [options] <addr>\n");
  printf("-c    \tPath to the certificate file.\n");
  printf("-k    \tPath to the private key file.\n");
  printf("-p    \tPassword for the private key.\n");
  printf("<addr>\tAn address.\n");
  exit(0);
}

void process(void) {
//printf("                          *** process ***\n");

    // Process incoming messages

    while(pn_messenger_incoming(messenger))
    {
printf("in while loop\n");

      pn_messenger_get(messenger, message);
      check(messenger);
      pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);


      char buffer[1024];
      size_t buffsize = sizeof(buffer);
      pn_data_t *body = pn_message_body(message);
      pn_data_format(body, buffer, &buffsize);

      printf("Address: %s\n", pn_message_get_address(message));
      const char* subject = pn_message_get_subject(message);
      printf("Subject: %s\n", subject ? subject : "(no subject)");
      printf("Content: %s\n", buffer);



      int err = pn_messenger_accept(messenger, tracker, 0);
printf("err = %d\n", err);
    }
}

// Callback used by emscripten to ensure pn_messenger_work gets called.
void work(void) {
//printf("                          *** work ***\n");

    int err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);

    if (err >= 0) {
        process();
    }

    err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);

    if (err >= 0) {
        process();
    }
}

int main(int argc, char** argv)
{
  char* certificate = NULL;
  char* privatekey = NULL;
  char* password = NULL;
  char* address = (char *) "amqp://~0.0.0.0";
  int c;
  opterr = 0;

  while((c = getopt(argc, argv, "hc:k:p:")) != -1)
  {
    switch(c)
    {
    case 'h':
      usage();
      break;

    case 'c': certificate = optarg; break;
    case 'k': privatekey = optarg; break;
    case 'p': password = optarg; break;

    case '?':
      if(optopt == 'c' ||
         optopt == 'k' ||
         optopt == 'p')
      {
        fprintf(stderr, "Option -%c requires an argument.\n", optopt);
      }
      else if(isprint(optopt))
      {
        fprintf(stderr, "Unknown option `-%c'.\n", optopt);
      }
      else
      {
        fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
      }
      return 1;
    default:
      abort();
    }
  }

  if (optind < argc)
  {
    address = argv[optind];
  }

//  pn_message_t * message;
//  pn_messenger_t * messenger;

  message = pn_message();
  messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.



//pn_messenger_set_incoming_window(messenger, 1024); // FA Addition.



  /* load the various command line options if they're set */
  if(certificate)
  {
    pn_messenger_set_certificate(messenger, certificate);
  }

  if(privatekey)
  {
    pn_messenger_set_private_key(messenger, privatekey);
  }

  if(password)
  {
    pn_messenger_set_password(messenger, password);
  }

  pn_messenger_start(messenger);
  check(messenger);

  pn_messenger_subscribe(messenger, address);
  check(messenger);

  pn_messenger_recv(messenger, -1); // Receive as many messages as messenger can buffer

#if EMSCRIPTEN
  //emscripten_set_main_loop(work, 0, 0);

  emscripten_set_network_callback(work);
#else
  while (1) {
    pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
    process();
  }
#endif

  return 0;
}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to