Hi Everyone,
I've been doing some work on a C reactor API for proton that is intended to
fit both alongside and underneath what gordon has been doing in pure
python. I have several goals with this work.
- Simplify/enable a reactive style of programming in C that is similar to
what gordon has built out in pure python.
- Provide a C API that translates well into the python/ruby/etc bindings,
so that sophisticated handlers can be written once in C rather than being
duplicated in each scripting language.
- Preserve the extensibility/flexibility that comes with being able to
define custom handlers in the bindings, so python/ruby/etc handlers can
intermix well with C handlers.
- Provide a C API that translates well into javascript via emscripten. In
some ways this is similar to the above goals with the other language
bindings, however I mention it separately because there are additional
memory management constraints for javascript since it has no finalizers.
I believe I've made significant progress towards most of these goals,
although there is still plenty of work left to do. I'd like to share a few
examples both to illustrate where I am with this and to solicit feedback
and/or help.
Let me say up front that these examples aren't intended to be "hello world"
type examples. The focus of this work has really been on the
reactor/handler/event-dispatch infrastructure, and so the example I've
chosen is really intended to illustrate key aspects of how this works. To
do that I've built an example that sets up a recurring task, a server, and
a client, all within the same process and then sends a number of messages
to itself.
I've included the same example twice, once written in C and once written in
the python binding of the C API. Please have a look and let me know what
you think.
--Rafael
#include <proton/reactor.h>
#include <proton/handlers.h>
#include <proton/engine.h>
#include <proton/message.h>
#include <assert.h>
#include <stdio.h>
#include <string.h>
// periodic task handler
void task_dispatch(pn_handler_t *handler, pn_event_t *event) {
pn_reactor_t *reactor = pn_event_reactor(event);
switch (pn_event_type(event)) {
case PN_TIMER_TASK:
printf("Hey!\n");
pn_reactor_schedule(reactor, 1000, handler);
break;
default:
break;
}
}
// server handler
void server_dispatch(pn_handler_t *handler, pn_event_t *event) {
pn_reactor_t *reactor = pn_event_reactor(event);
switch (pn_event_type(event)) {
case PN_REACTOR_INIT:
{
// set up a recuring task
pn_handler_t *handler = pn_handler(task_dispatch);
pn_reactor_schedule(reactor, 0, handler);
pn_decref(handler);
}
break;
case PN_DELIVERY:
{
pn_link_t *link = pn_event_link(event);
pn_delivery_t *dlv = pn_event_delivery(event);
if (pn_link_is_receiver(link) && !pn_delivery_partial(dlv)) {
char buf[1024];
ssize_t n = pn_link_recv(link, buf, 1024);
if (n > 0) {
pn_message_t *msg = pn_message();
pn_message_decode(msg, buf, n);
pn_string_t *str = pn_string(NULL);
pn_inspect(msg, str);
printf("Got: %s\n", pn_string_get(str));
pn_message_free(msg);
pn_free(str);
}
pn_delivery_settle(dlv);
}
}
break;
default:
break;
}
}
// fancy stateful client handler
typedef pn_handler_t client_t;
typedef struct {
const char *hostname;
int count;
const char **messages;
pn_message_t *msg;
} client_state_t;
client_state_t *client_state(client_t *client) {
return (client_state_t *) pn_handler_mem(client);
}
void client_cleanup(client_t *client) {
client_state_t *cs = client_state(client);
pn_message_free(cs->msg);
}
void client_dispatch(client_t *client, pn_event_t *event) {
client_state_t *state = client_state(client);
pn_connection_t *conn = pn_event_connection(event);
switch (pn_event_type(event)) {
case PN_CONNECTION_INIT:
{
pn_connection_t *conn = pn_event_connection(event);
pn_connection_set_hostname(conn, state->hostname);
pn_connection_open(conn);
pn_session_t *ssn = pn_session(conn);
pn_session_open(ssn);
pn_link_t *snd = pn_sender(ssn, "sender");
pn_link_open(snd);
}
break;
case PN_LINK_FLOW:
{
pn_link_t *snd = pn_event_link(event);
while (pn_link_credit(snd) > 0 && state->count > 0) {
pn_delivery_t *dlv = pn_delivery(snd, pn_dtag("", 0));
pn_message_t *msg = state->msg;
pn_message_clear(msg);
pn_data_t *data = pn_message_body(msg);
const char *body = state->messages[0];
pn_data_put_string(data, pn_bytes(strlen(body), body));
char buf[1024];
size_t size = 1024;
int err = pn_message_encode(msg, buf, &size);
assert(!err);
pn_link_send(snd, buf, size);
pn_delivery_settle(dlv);
state->count--;
state->messages++;
}
if (state->count == 0) {
pn_connection_close(conn);
}
}
break;
default:
break;
}
}
/**
* The client handler wants to be stateful, so it uses a fancy form of
* the handler constructor that lets it add its own state to the
* generic handler. The state is accessed via pn_hander_mem(handler)
* which gives us a void * of the size we ask for in the constructor.
* For convenience and safety client_state is define above to cast
* appropriately.
*/
client_t *client_handler(const char *hostname, int count, const char **messages) {
client_t *client = pn_handler_new(client_dispatch, sizeof(client_state_t), client_cleanup);
client_state_t *state = client_state(client);
state->hostname = hostname;
state->count = count;
state->messages = messages;
state->msg = pn_message();
return client;
}
int main(int argc, const char **argv) {
pn_reactor_t *reactor = pn_reactor();
// set up default handlers for our reactor
pn_handler_t *root = pn_reactor_handler(reactor);
pn_handler_add(root, pn_handler(server_dispatch));
pn_handler_add(root, pn_flowcontroller(1024));
pn_handler_add(root, pn_handshaker());
// set up an acceptor, just use the default handlers
pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL);
// Set up a client to talk to ourselves.
client_t *client = client_handler("localhost:5672", argc - 1, argv + 1);
pn_reactor_connection(reactor, client);
// run until there is nothing more left to do
pn_reactor_run(reactor);
// cleanup before shutting down
pn_reactor_free(reactor);
pn_handler_free(client);
}
#!/usr/bin/python
import traceback, time
from proton import Event, Message, Delivery, dispatch
from proton.reactors import Reactor
from proton.handlers import CFlowController, CHandshaker
class MessageDecoder:
def __init__(self, delegate):
self.__delegate = delegate
self.__message = Message()
def on_delivery(self, event):
dlv = event.delivery
if dlv.link.is_receiver and not dlv.partial:
encoded = dlv.link.recv(dlv.pending)
self.__message.decode(encoded)
try:
dispatch(self.__delegate, "on_message", dlv.link, self.__message)
dlv.update(Delivery.ACCEPTED)
except:
dlv.update(Delivery.REJECTED)
traceback.print_exc()
dlv.settle()
class Printer(object):
def __init__(self):
self.count = 0
def on_message(self, rcv, msg):
self.count += 1
print "RCVD[%s]" % self.count, msg
class PeriodicThingy:
def on_timer_task(self, event):
print "Hey", time.ctime(time.time())
event.reactor.schedule(1, self)
class Client:
def __init__(self, hostname, messages):
self.hostname = hostname
self.messages = messages
def on_connection_init(self, event):
conn = event.connection
conn.hostname = self.hostname
ssn = conn.session()
snd = ssn.sender("sender")
conn.open()
ssn.open()
snd.open()
def on_link_flow(self, event):
snd = event.link
while snd.credit > 0 and self.messages:
dlv = snd.delivery("")
snd.send(Message(body=self.messages.pop()).encode())
dlv.settle()
class Program:
def __init__(self):
# this uses mixed handlers, some defined in C and some defined in python
self.handlers = [MessageDecoder(Printer()), CHandshaker(), CFlowController(1024)]
def on_reactor_init(self, event):
reactor = event.reactor
print "scheduling", time.ctime(time.time())
reactor.schedule(0, PeriodicThingy())
import sys
reactor = Reactor(Program())
reactor.acceptor("0.0.0.0", "5672")
reactor.connection(Client("localhost:5672", sys.argv[1:]))
reactor.run()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]