Hi again Rafael,
really sorry to keep on at this one :-(
I seem to be having trouble with the tracker/status stuff now......
I got your python examples working, so I figured I'd try and go along
similar lines (using pn_messenger_work as the loop blocker for now
rather than my pn_messenger_wait just to rule out that as a potential
issue) so I've got:
while (1) {
//pn_messenger_wait(messenger, -1); // Block indefinitely until
there has been socket activity.
pn_messenger_work(messenger, -1); // Block indefinitely until there
has been socket activity.
process(NULL);
}
I've tried doing the explicit accept using a tracker but my recv-async.c
doesn't *seem* to be sending it??
I've tried my ./recv-async using your python send_async.py and I don't
end up in the ACCEPTED state :-(
similarly when I try ./recv_async.py and use my ./send-async my stuff
keeps reporting status = 0 which is PN_STATUS_UNKNOWN
<http://qpid.apache.org/releases/qpid-proton-0.5/protocol-engine/c/api/messenger_8h.html#a242e4ee54b9c0a416443c7da5f6e045ba0b46b1041679460baaba2ddcdb2173f2>
This is really starting to drive me nuts now. I must be doing something
stupid, but I can't see what.
The code I've attached is rather hacky as I've been trying to mess
around, but I'd be really grateful for any suggestions as to what I'm
doing wrong.
This is feeling a bit like voodoo at the moment :-D
Cheers.
Frase
On 13/12/13 18:40, Rafael Schloming wrote:
On Fri, Dec 13, 2013 at 12:54 PM, Fraser Adams <
[email protected]> wrote:
Hey Rafael,
many thanks again for your relies, I'll take a look at the python code.
For info in the branch that I'm doing my JavaScript stuff in I "pimped"
messenger.h and messenger.c slightly adding
PN_EXTERN int pn_messenger_wait(pn_messenger_t *messenger, int timeout);
to messenger.h and
int pn_messenger_wait(pn_messenger_t *messenger, int timeout)
{
return pn_driver_wait(messenger->driver, timeout);
}
to messenger.c
so my notifier now looks like:
while (1) {
pn_messenger_wait(messenger, -1); // Block indefinitely until there
has been socket activity.
main_loop(NULL);
}
And that works perfectly - yay :-)
Would you have any issues with that going forward as an interim step until
you're able to move forward with the fully decoupled driver?
Not at all. Please feel free.
--Rafael
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <ctype.h>
#if EMSCRIPTEN
#include <emscripten.h>
#endif
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make these global
pn_message_t * message;
pn_messenger_t * messenger;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: recv [options] <addr>\n");
printf("-c \tPath to the certificate file.\n");
printf("-k \tPath to the private key file.\n");
printf("-p \tPassword for the private key.\n");
printf("<addr>\tAn address.\n");
exit(0);
}
void process(void *arg) {
//printf(" *** process ***\n");
//int err = pn_messenger_work(messenger, 0); // Sends any outstanding messages queued for messenger.
//printf("err = %d\n", err);
// Process incoming messages
while(pn_messenger_incoming(messenger))
{
printf("in while loop\n");
pn_messenger_get(messenger, message);
check(messenger);
char buffer[1024];
size_t buffsize = sizeof(buffer);
pn_data_t *body = pn_message_body(message);
pn_data_format(body, buffer, &buffsize);
printf("Address: %s\n", pn_message_get_address(message));
const char* subject = pn_message_get_subject(message);
printf("Subject: %s\n", subject ? subject : "(no subject)");
printf("Content: %s\n", buffer);
pn_tracker_t tracker = pn_messenger_incoming_tracker(messenger);
int err = pn_messenger_accept(messenger, tracker, PN_CUMULATIVE);
printf("err = %d\n", err);
}
}
int main(int argc, char** argv)
{
char* certificate = NULL;
char* privatekey = NULL;
char* password = NULL;
char* address = (char *) "amqp://~0.0.0.0";
int c;
opterr = 0;
while((c = getopt(argc, argv, "hc:k:p:")) != -1)
{
switch(c)
{
case 'h':
usage();
break;
case 'c': certificate = optarg; break;
case 'k': privatekey = optarg; break;
case 'p': password = optarg; break;
case '?':
if(optopt == 'c' ||
optopt == 'k' ||
optopt == 'p')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc)
{
address = argv[optind];
}
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // FA Addition.
/* load the various command line options if they're set */
if(certificate)
{
pn_messenger_set_certificate(messenger, certificate);
}
if(privatekey)
{
pn_messenger_set_private_key(messenger, privatekey);
}
if(password)
{
pn_messenger_set_password(messenger, password);
}
pn_messenger_start(messenger);
check(messenger);
pn_messenger_subscribe(messenger, address);
check(messenger);
pn_messenger_recv(messenger, -1); // Receive as many messages as messenger can buffer
#if EMSCRIPTEN
emscripten_set_main_loop(process, 0, 0);
#else
while (1) {
//pn_messenger_wait(messenger, -1); // Block indefinitely until there has been socket activity.
pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
process(NULL);
}
#endif
return 0;
}
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
#include "proton/message.h"
#include "proton/messenger.h"
#include "proton/driver.h"
#include "pncompat/misc_funcs.inc"
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <ctype.h>
#if EMSCRIPTEN
#include <emscripten.h>
#endif
#define check(messenger) \
{ \
if(pn_messenger_errno(messenger)) \
{ \
die(__FILE__, __LINE__, pn_error_text(pn_messenger_error(messenger))); \
} \
} \
// FA Temporarily make global
pn_message_t * message;
pn_messenger_t * messenger;
pn_tracker_t tracker;
int tracked = 1;
void die(const char *file, int line, const char *message)
{
fprintf(stderr, "%s:%i: %s\n", file, line, message);
exit(1);
}
void usage()
{
printf("Usage: send [-a addr] [message]\n");
printf("-a \tThe target address [amqp[s]://domain[/name]]\n");
printf("message\tA text string to send.\n");
exit(0);
}
/*
void process(void *arg) {
printf(" *** process ***\n");
//int err = pn_messenger_work(messenger, 0); // Sends any outstanding messages queued for messenger.
int pending = pn_messenger_outgoing(messenger); // Get the number of pending messages in the outgoing message queue.
//printf("err = %d\n", err);
printf("pending = %d\n", pending);
if (state == SENT_MESSAGE && !pending) {
printf("calling stop\n");
pn_message_free(message); // Release message.
pn_messenger_stop(messenger);
state = STOPPING;
} else if (state == STOPPING && !err) {
printf("exiting\n");
pn_messenger_free(messenger);
exit(0);
}
}
*/
void process(void *arg) {
printf(" *** process ***\n");
// Process outgoing messages
pn_status_t status = pn_messenger_status(messenger, tracker);
printf("status = %d\n", status);
/*
if (status != PN_STATUS_PENDING) {
printf("status = %d\n", status);
pn_messenger_settle(messenger, tracker, 0);
tracked--;
}
*/
}
int main(int argc, char** argv)
{
int c;
opterr = 0;
char * address = (char *) "amqp://0.0.0.0";
char * msgtext = (char *) "Hello World!";
while((c = getopt(argc, argv, "ha:b:c:")) != -1)
{
switch(c)
{
case 'a': address = optarg; break;
case 'h': usage(); break;
case '?':
if(optopt == 'a')
{
fprintf(stderr, "Option -%c requires an argument.\n", optopt);
}
else if(isprint(optopt))
{
fprintf(stderr, "Unknown option `-%c'.\n", optopt);
}
else
{
fprintf(stderr, "Unknown option character `\\x%x'.\n", optopt);
}
return 1;
default:
abort();
}
}
if (optind < argc) msgtext = argv[optind];
// pn_message_t * message;
// pn_messenger_t * messenger;
message = pn_message();
messenger = pn_messenger(NULL);
pn_messenger_set_blocking(messenger, false); // Put messenger into non-blocking mode.
pn_messenger_start(messenger);
pn_message_set_address(message, address);
pn_data_t *body = pn_message_body(message);
pn_data_put_string(body, pn_bytes(strlen(msgtext), msgtext));
pn_messenger_put(messenger, message);
check(messenger);
tracker = pn_messenger_outgoing_tracker(messenger);
//printf("tracker = %lld\n", (long long int)tracker);
#if EMSCRIPTEN
emscripten_set_main_loop(process, 0, 0);
#else
while (1) {
//pn_messenger_wait(messenger, -1); // Block indefinitely until there has been socket activity.
pn_messenger_work(messenger, -1); // Block indefinitely until there has been socket activity.
process(NULL);
}
#endif
return 0;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]