Hi Everyone,

I've been doing some work on a C reactor API for proton that is intended to
fit both alongside and underneath what gordon has been doing in pure
python. I have several goals with this work.

 - Simplify/enable a reactive style of programming in C that is similar to
what gordon has built out in pure python.

 - Provide a C API that translates well into the python/ruby/etc bindings,
so that sophisticated handlers can be written once in C rather than being
duplicated in each scripting language.

 - Preserve the extensibility/flexibility that comes with being able to
define custom handlers in the bindings, so python/ruby/etc handlers can
intermix well with C handlers.

 - Provide a C API that translates well into javascript via emscripten. In
some ways this is similar to the above goals with the other language
bindings, however I mention it separately because there are additional
memory management constraints for javascript since it has no finalizers.

I believe I've made significant progress towards most of these goals,
although there is still plenty of work left to do. I'd like to share a few
examples both to illustrate where I am with this and to solicit feedback
and/or help.

Let me say up front that these examples aren't intended to be "hello world"
type examples. The focus of this work has really been on the
reactor/handler/event-dispatch infrastructure, and so the example I've
chosen is really intended to illustrate key aspects of how this works. To
do that I've built an example that sets up a recurring task, a server, and
a client, all within the same process and then sends a number of messages
to itself.

I've included the same example twice, once written in C and once written in
the python binding of the C API. Please have a look and let me know what
you think.

--Rafael
#include <proton/reactor.h>
#include <proton/handlers.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>

// periodic task handler

void task_dispatch(pn_handler_t *handler, pn_event_t *event) {
  pn_reactor_t *reactor = pn_event_reactor(event);
  switch (pn_event_type(event)) {
  case PN_TIMER_TASK:
    printf("Hey!\n");
    pn_reactor_schedule(reactor, 1000, handler);
    break;
  default:
    break;
  }
}

// server handler

void server_dispatch(pn_handler_t *handler, pn_event_t *event) {
  pn_reactor_t *reactor = pn_event_reactor(event);
  switch (pn_event_type(event)) {
  case PN_REACTOR_INIT:
    {
      // set up a recuring task
      pn_handler_t *handler = pn_handler(task_dispatch);
      pn_reactor_schedule(reactor, 0, handler);
      pn_decref(handler);
    }
    break;
  case PN_DELIVERY:
    {
      pn_link_t *link = pn_event_link(event);
      pn_delivery_t *dlv = pn_event_delivery(event);
      if (pn_link_is_receiver(link) && !pn_delivery_partial(dlv)) {
        char buf[1024];
        ssize_t n = pn_link_recv(link, buf, 1024);
        if (n > 0) {
          pn_message_t *msg = pn_message();
          pn_message_decode(msg, buf, n);
          pn_string_t *str = pn_string(NULL);
          pn_inspect(msg, str);
          printf("Got: %s\n", pn_string_get(str));
          pn_message_free(msg);
          pn_free(str);
        }
        pn_delivery_settle(dlv);
      }
    }
    break;
  default:
    break;
  }
}

// fancy stateful client handler

typedef pn_handler_t client_t;

typedef struct {
  const char *hostname;
  int count;
  const char **messages;
  pn_message_t *msg;
} client_state_t;

client_state_t *client_state(client_t *client) {
  return (client_state_t *) pn_handler_mem(client);
}

void client_cleanup(client_t *client) {
  client_state_t *cs = client_state(client);
  pn_message_free(cs->msg);
}

void client_dispatch(client_t *client, pn_event_t *event) {
  client_state_t *state = client_state(client);
  pn_connection_t *conn = pn_event_connection(event);

  switch (pn_event_type(event)) {
  case PN_CONNECTION_INIT:
    {
      pn_connection_t *conn = pn_event_connection(event);
      pn_connection_set_hostname(conn, state->hostname);
      pn_connection_open(conn);
      pn_session_t *ssn = pn_session(conn);
      pn_session_open(ssn);
      pn_link_t *snd = pn_sender(ssn, "sender");
      pn_link_open(snd);
    }
    break;
  case PN_LINK_FLOW:
    {
      pn_link_t *snd = pn_event_link(event);
      while (pn_link_credit(snd) > 0 && state->count > 0) {
        pn_delivery_t *dlv = pn_delivery(snd, pn_dtag("", 0));
        pn_message_t *msg = state->msg;
        pn_message_clear(msg);
        pn_data_t *data = pn_message_body(msg);
        const char *body = state->messages[0];
        pn_data_put_string(data, pn_bytes(strlen(body), body));
        char buf[1024];
        size_t size = 1024;
        int err = pn_message_encode(msg, buf, &size);
        assert(!err);
        pn_link_send(snd, buf, size);
        pn_delivery_settle(dlv);
        state->count--;
        state->messages++;
      }
      if (state->count == 0) {
        pn_connection_close(conn);
      }
    }
    break;
  default:
    break;
  }
}

/**
 * The client handler wants to be stateful, so it uses a fancy form of
 * the handler constructor that lets it add its own state to the
 * generic handler. The state is accessed via pn_hander_mem(handler)
 * which gives us a void * of the size we ask for in the constructor.
 * For convenience and safety client_state is define above to cast
 * appropriately.
 */
client_t *client_handler(const char *hostname, int count, const char **messages) {
  client_t *client = pn_handler_new(client_dispatch, sizeof(client_state_t), client_cleanup);
  client_state_t *state = client_state(client);
  state->hostname = hostname;
  state->count = count;
  state->messages = messages;
  state->msg = pn_message();
  return client;
}

int main(int argc, const char **argv) {
  pn_reactor_t *reactor = pn_reactor();

  // set up default handlers for our reactor
  pn_handler_t *root = pn_reactor_handler(reactor);
  pn_handler_add(root, pn_handler(server_dispatch));
  pn_handler_add(root, pn_flowcontroller(1024));
  pn_handler_add(root, pn_handshaker());

  // set up an acceptor, just use the default handlers
  pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);

  // Set up a client to talk to ourselves.
  client_t *client = client_handler("localhost:5672", argc - 1, argv + 1);
  pn_reactor_connection(reactor, client);

  // run until there is nothing more left to do
  pn_reactor_run(reactor);

  // cleanup before shutting down
  pn_reactor_free(reactor);
  pn_handler_free(client);
}
#!/usr/bin/python
import traceback, time
from proton import Event, Message, Delivery, dispatch
from proton.reactors import Reactor
from proton.handlers import CFlowController, CHandshaker

class MessageDecoder:

    def __init__(self, delegate):
        self.__delegate = delegate
        self.__message = Message()

    def on_delivery(self, event):
        dlv = event.delivery
        if dlv.link.is_receiver and not dlv.partial:
            encoded = dlv.link.recv(dlv.pending)
            self.__message.decode(encoded)
            try:
                dispatch(self.__delegate, "on_message", dlv.link, self.__message)
                dlv.update(Delivery.ACCEPTED)
            except:
                dlv.update(Delivery.REJECTED)
                traceback.print_exc()
            dlv.settle()

class Printer(object):

    def __init__(self):
        self.count = 0

    def on_message(self, rcv, msg):
        self.count += 1
        print "RCVD[%s]" % self.count, msg

class PeriodicThingy:
    def on_timer_task(self, event):
        print "Hey", time.ctime(time.time())
        event.reactor.schedule(1, self)

class Client:

    def __init__(self, hostname, messages):
        self.hostname = hostname
        self.messages = messages

    def on_connection_init(self, event):
        conn = event.connection
        conn.hostname = self.hostname
        ssn = conn.session()
        snd = ssn.sender("sender")
        conn.open()
        ssn.open()
        snd.open()

    def on_link_flow(self, event):
        snd = event.link
        while snd.credit > 0 and self.messages:
            dlv = snd.delivery("")
            snd.send(Message(body=self.messages.pop()).encode())
            dlv.settle()

class Program:

    def __init__(self):
        # this uses mixed handlers, some defined in C and some defined in python
        self.handlers = [MessageDecoder(Printer()), CHandshaker(), CFlowController(1024)]

    def on_reactor_init(self, event):
        reactor = event.reactor
        print "scheduling", time.ctime(time.time())
        reactor.schedule(0, PeriodicThingy())

import sys

reactor = Reactor(Program())
reactor.acceptor("0.0.0.0", "5672")
reactor.connection(Client("localhost:5672", sys.argv[1:]))
reactor.run()

Reply via email to