Hey folks,
I've noticed that PROTON-525/531/534 cover work to expose some bits of
messenger that were previously internal and allow messenger to be driven
from an external poll/select/epoll.
I'm quite interested in this from the perspective of the JavaScript
bindings that I'm working on, but to be honest I'm currently left
scratching my head trying to figure out how the new APIs are intended to
work.
I don't suppose that there are any examples available?
I currently have a recv-async.c and send-async.c (attached) they are
still a bit hacky as they are currently work in progress as I push the
necessary features into emscripten (the C->JavaScript compiler I'm
using) but they both work in either native C or JavaScript (the
emscripten_set_network_callback gets triggered by WebSocket activity and
allows fully async behaviour, so I don't need any nasty polling).
I've just merged the latest proton-c stuff to the branch I'm working on
for the JavaScript bindings and everything is still working nicely with
the current approach, but I'm guessing that the new capabilities might
be able to make things more efficient?
I'm currently working on actual binding code, so I can call messenger
direct from native JavaScript as opposed to compiling C/C++ into
JavaScript, so far it has got a lot of parallels with the python
bindings - though clearly only async stuff makes any sense for JavaScript.
I'd really appreciate tips and code samples from the folks who have been
working on this API.
Cheers,
Frase
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/driver.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#if EMSCRIPTEN
#include <emscripten.h>
void emscripten_set_network_callback(void (*func)());
#endif
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make global
pn_message_t * message;
pn_messenger_t * messenger;
pn_tracker_t tracker;
int tracked = 1;
int running = 1;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage(void)
{
printf("Usage: send [-a addr] [message]\n");
printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
printf("message\tA text string to send.\n");
exit(0);
}
void process(void) {
//printf(" *** process ***\n");
// Process outgoing messages
pn_status_t status = pn_messenger_status(messenger, tracker);
//printf("status = %d\n", status);
if (status != PN_STATUS_PENDING) {
printf("status = %d\n", status);
//pn_messenger_settle(messenger, tracker, 0);
//tracked--;
if (running) {
printf("stopping\n");
pn_messenger_stop(messenger);
running = 0;
}
}
if (pn_messenger_stopped(messenger)) {
printf("exiting\n");
pn_message_free(message);
pn_messenger_free(messenger);
exit(0);
}
}
// Callback used by emscripten to ensure pn_messenger_work gets called.
void work(void) {
//printf(" *** work ***\n");
int err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);
if (err >= 0) {
process();
}
err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);
if (err >= 0) {
process();
}
}
int main(int argc, char** argv)
{
int c;
opterr = 0;
char * address = (char *) "amqp://0.0.0.0";
char * msgtext = (char *) "Hello World!";
while((c = getopt(argc, argv, "ha:b:c:")) != -1)
{
switch(c)
{
case 'a': address = optarg; break;
case 'h': usage(); break;
case '?':
if(optopt == 'a')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc) msgtext = argv[optind];
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // Put messenger into non-blocking mode.
pn_messenger_set_outgoing_window(messenger, 1024); // FA Addition.
pn_messenger_start(messenger);
pn_message_set_address(message, address);
pn_data_t *body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
tracker = pn_messenger_outgoing_tracker(messenger);
//printf("tracker = %lld\n", (long long int)tracker);
#if EMSCRIPTEN
//emscripten_set_main_loop(work, 0, 0);
emscripten_set_network_callback(work);
#else
while (1) {
pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
process();
}
#endif
return 0;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#if EMSCRIPTEN
#include <emscripten.h>
void emscripten_set_network_callback(void (*func)());
#endif
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make these global
pn_message_t * message;
pn_messenger_t * messenger;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage(void)
{
printf("Usage: recv [options] <addr>\n");
printf("-c \tPath to the certificate file.\n");
printf("-k \tPath to the private key file.\n");
printf("-p \tPassword for the private key.\n");
printf("<addr>\tAn address.\n");
exit(0);
}
void process(void) {
//printf(" *** process ***\n");
// Process incoming messages
while(pn_messenger_incoming(messenger))
{
printf("in while loop\n");
pn_messenger_get(messenger, message);
check(messenger);
pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);
char buffer[1024];
size_t buffsize = sizeof(buffer);
pn_data_t *body = pn_message_body(message);
pn_data_format(body, buffer, &buffsize);
printf("Address: %s\n", pn_message_get_address(message));
const char* subject = pn_message_get_subject(message);
printf("Subject: %s\n", subject ? subject : "(no subject)");
printf("Content: %s\n", buffer);
int err = pn_messenger_accept(messenger, tracker, 0);
printf("err = %d\n", err);
}
}
// Callback used by emscripten to ensure pn_messenger_work gets called.
void work(void) {
//printf(" *** work ***\n");
int err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);
if (err >= 0) {
process();
}
err = pn_messenger_work(messenger, 0);
printf("err = %d\n", err);
if (err >= 0) {
process();
}
}
int main(int argc, char** argv)
{
char* certificate = NULL;
char* privatekey = NULL;
char* password = NULL;
char* address = (char *) "amqp://~0.0.0.0";
int c;
opterr = 0;
while((c = getopt(argc, argv, "hc:k:p:")) != -1)
{
switch(c)
{
case 'h':
usage();
break;
case 'c': certificate = optarg; break;
case 'k': privatekey = optarg; break;
case 'p': password = optarg; break;
case '?':
if(optopt == 'c' ||
optopt == 'k' ||
optopt == 'p')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc)
{
address = argv[optind];
}
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
//pn_messenger_set_incoming_window(messenger, 1024); // FA Addition.
/* load the various command line options if they're set */
if(certificate)
{
pn_messenger_set_certificate(messenger, certificate);
}
if(privatekey)
{
pn_messenger_set_private_key(messenger, privatekey);
}
if(password)
{
pn_messenger_set_password(messenger, password);
}
pn_messenger_start(messenger);
check(messenger);
pn_messenger_subscribe(messenger, address);
check(messenger);
pn_messenger_recv(messenger, -1); // Receive as many messages as messenger can buffer
#if EMSCRIPTEN
//emscripten_set_main_loop(work, 0, 0);
emscripten_set_network_callback(work);
#else
while (1) {
pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
process();
}
#endif
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]