Here are some stubs I've been working on that describe most of the
communication layer. For those who like dealing directly with code,
please dig in. I will be following up with some UML and a higher level
description tomorrow.
--Rafael
Arnaud Simon wrote:
Hi,
I have attached a document describing my view on the new 0-10
implementation. I would suggest that we first implement a 0.10 client
that we will test against the 0.10 C++ broker. We will then have a
chance to discuss all together the Java broker design during our Java
face to face (Rob should organize it in Glasgow later this year).
Basically we have identified three main components: - the
communication layer that is common to broker and client - the Qpid API
that is client specific and plugged on the communication
layer - The JMS API that comes on top of the Qpid API
The plan is to provide support for 0.8 and 0.10 by first distinguishing
the name spaces. Once the 0.10 client is stable we will then be able to
provide a 0.8 implementation of the Qpid API (based on the existing code
obviously). This will have the advantage to only support a single JMS
implementation.
I will send in another thread the QPI API as Rajith and I see it right
now. Rafael should send more info about the communication layer.
Regards
Arnaud
------------------------------------------------------------------------
import java.nio.ByteBuffer;
import java.util.*;
import java.lang.annotation.*;
public class Stub {
private static Connection conn = new Connection();
private static void frame(short track, short type, boolean first, boolean
last) {
frame(track, type, first, last, (short)0);
}
private static void frame(short track, short type, boolean first, boolean
last, short class_meth) {
Frame frame = new Frame();
frame.channel = 0;
frame.track = track;
frame.type = type;
frame.firstFrame = first;
frame.lastFrame = last;
frame.firstSegment = true;
frame.lastSegment = true;
frame.payload = ByteBuffer.allocate(10);
frame.payload.putShort(class_meth);
frame.payload.flip();
conn.handle(frame);
}
public static final void main(String[] args) {
frame(Frame.L2, Frame.METHOD, true, true, (short)2);
frame(Frame.L4, Frame.METHOD, true, false, (short) 0);
frame(Frame.L4, Frame.METHOD, false, false);
frame(Frame.L3, Frame.METHOD, true, true, (short) 1);
frame(Frame.L4, Frame.METHOD, false, true);
frame(Frame.L4, Frame.HEADER, true, false);
frame(Frame.L4, Frame.HEADER, false, false);
frame(Frame.L4, Frame.HEADER, false, true);
frame(Frame.L4, Frame.BODY, true, false);
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L1, Frame.METHOD, true, true, (short) 1);
frame(Frame.L4, Frame.BODY, false, false);
frame(Frame.L4, Frame.BODY, false, true);
}
}
//====: Handlers and Events :=================================================//
/**
* The Handler interface is used extensively throughout this code. A
* handler handles things.
*/
interface Handler<E> {
void handle(E event);
}
/**
* Events are a common class of thing to handle. An event has a
* context and a target. This division permits the same target
* instance to be used in a variety of contexts.
*/
class Event<C, T> {
C context;
T target;
public Event(C context, T target) {
this.context = context;
this.target = target;
}
}
/**
* The Switch class implements generic event dispatch. It's possible
* that less generic switch statements may be faster than this
* approach, however the generic switch is useful for keeping these
* stubs concise.
*/
abstract class Switch<K,E> implements Handler<E> {
private Map<K,Handler<E>> handlers =
new HashMap<K,Handler<E>>();
public void map(K key, Handler<E> handler) {
handlers.put(key, handler);
}
public void handle(E event) {
K key = resolve(event);
Handler<E> handler = handlers.get(key);
handler.handle(event);
}
abstract K resolve(E event);
}
//====: Connection, Channel, and Session :====================================//
class Connection implements Handler<Frame> {
Map<Short,Channel> channels = new HashMap<Short,Channel>();
public void handle(Frame frame) {
Channel channel = channels.get(frame.channel);
if (channel == null) {
channel = new Channel(this);
channels.put(frame.channel, channel);
}
channel.handle(frame);
}
}
class Channel extends AbstractInvoker implements Handler<Frame>,
DelegateContext<Channel> {
Connection connection;
TrackSwitch<Channel> tracks = new TrackSwitch<Channel>();
// session may be null
Session session;
public Channel(Connection connection) {
this.connection = connection;
tracks.map(Frame.L1, new MethodHandler<Channel>());
tracks.map(Frame.L2, new MethodHandler<Channel>());
tracks.map(Frame.L3, new SessionResolver<Frame>(new
MethodHandler<Session>()));
tracks.map(Frame.L4, new SessionResolver<Frame>(new
ContentHandler<Session>()));
}
public void handle(Frame frame) {
tracks.handle(new Event<Channel,Frame>(this, frame));
}
public void write(Writable writable) {
System.out.println("writing: " + writable);
}
StructFactory factory = new StructFactory_v0_10();
protected StructFactory getFactory() {
return factory;
}
public Delegate<Channel> getDelegate(Struct struct) {
return new ChannelDelegate();
}
protected void invoke(Method m) {
write(m);
}
protected void invoke(Method m, Handler<Struct> handler) {
throw new UnsupportedOperationException();
}
}
class Session extends AbstractInvoker implements DelegateContext<Session> {
// channel may be null
Channel channel;
int command_id = 0;
Map<Integer,Handler<Struct>> handlers = new
HashMap<Integer,Handler<Struct>>();
protected void invoke(Method m) {
command_id++;
channel.write(m);
}
protected void invoke(Method m, Handler<Struct> handler) {
invoke(m);
handlers.put(command_id, handler);
}
private StructFactory factory = new StructFactory_v0_10();
protected StructFactory getFactory() {
return factory;
}
private Delegate<Session> delegate = new SessionDelegate();
public Delegate<Session> getDelegate(Struct struct) {
return delegate;
}
}
//====: Frame and Segment :===================================================//
class Frame {
public static final short L1 = 0;
public static final short L2 = 1;
public static final short L3 = 2;
public static final short L4 = 3;
public static final short METHOD = 1;
public static final short HEADER = 2;
public static final short BODY = 3;
short track;
short type;
short channel;
boolean firstSegment;
boolean lastSegment;
boolean firstFrame;
boolean lastFrame;
ByteBuffer payload;
int sequence;
public ByteBuffer getPayload () {
return payload.slice();
}
public int getSize() {
return payload.limit();
}
public String toString() {
return "Frame: track=" + track + ", type=" + type + ", firstFrame=" +
firstFrame + ", lastFrame=" + lastFrame;
}
}
class Segment {
Collection<Frame> frames = new ArrayList<Frame>();
public void add(Frame frame) {
frames.add(frame);
}
public ByteBuffer getPayload() {
// we should probably use our own decoder interface here so
// that we can directly read from the incoming frame objects
// and automatically skip frame boundaries without copying
// everything in order to get a contiguous byte buffer
int capacity = 0;
for (Frame frame : frames) {
capacity += frame.getSize();
}
ByteBuffer buf = ByteBuffer.allocate(capacity);
for (Frame frame : frames) {
buf.put(frame.getPayload());
}
buf.flip();
return buf;
}
public String toString() {
StringBuffer buf = new StringBuffer();
String sep = ",\n ";
for (Frame f : frames) {
buf.append(f.toString());
buf.append(sep);
}
buf.setLength(buf.length() - sep.length());
return buf.toString();
}
}
//====: Dispatch and Transform of Incomming Events :==========================//
/**
* Send incoming frames to a different handler based on track.
*/
class TrackSwitch<C> extends Switch<Short,Event<C,Frame>> {
public Short resolve(Event<C,Frame> event) {
return event.target.track;
}
}
/**
* Send incoming frames to a different handler based on type.
*/
class TypeSwitch<C> extends Switch<Short,Event<C,Frame>> {
public Short resolve(Event<C,Frame> event) {
return event.target.type;
}
}
/**
* SegmentAssembler is a stateful handler that aggregates Frame events
* into Segment events. This should only be used where it is necessary
* to assemble a Segment before processing, e.g. for Method and Header
* segments.
*/
class SegmentAssembler<C> implements Handler<Event<C,Frame>> {
private Segment segment;
private Handler<Event<C,Segment>> handler;
public SegmentAssembler(Handler<Event<C,Segment>> handler) {
this.handler = handler;
}
public void handle(Event<C, Frame> event) {
Frame frame = event.target;
if (frame.firstFrame) {
segment = new Segment();
}
segment.add(frame);
if (frame.lastFrame) {
handler.handle(new Event<C, Segment>(event.context, segment));
}
}
}
/**
* SessionResolver is a stateless handler that accepts incoming events
* whose context is a Channel, and produces an event whose context is
* a Session.
*/
class SessionResolver<T> implements Handler<Event<Channel,T>> {
private Handler<Event<Session,T>> handler;
public SessionResolver(Handler<Event<Session,T>> handler) {
this.handler = handler;
}
public void handle(Event<Channel,T> event) {
handler.handle(new Event<Session,T>(event.context.session,
event.target));
}
}
/**
* MethodHandler is a stateful handler that aggregates frames into
* method segments and dispatches the resulting method. It does not
* accept any segment type other than Frame.METHOD.
*/
class MethodHandler<C extends DelegateContext<C>> extends TypeSwitch<C> {
public MethodHandler() {
map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>()));
}
}
/**
* ContentHandler is a stateful handler that aggregates and dispatches
* method and header frames, and passes body frames through to another
* handler.
*/
class ContentHandler<C extends DelegateContext<C>> extends TypeSwitch<C> {
public ContentHandler() {
map(Frame.METHOD, new SegmentAssembler<C>(new MethodDispatcher<C>()));
map(Frame.HEADER, new SegmentAssembler<C>(new HeaderHandler<C>()));
map(Frame.BODY, new BodyHandler<C>());
}
}
/**
* MethodDispatcher parses and dispatches a method segment.
*/
class MethodDispatcher<C extends DelegateContext<C>> implements
Handler<Event<C,Segment>> {
// should be passed in
private StructFactory factory = new StructFactory_v0_10();
public void handle(Event<C,Segment> event) {
System.out.println("got method segment:\n " + event.target);
ByteBuffer bb = event.target.getPayload();
short type = bb.getShort();
Struct struct = factory.create(type);
struct.read(bb);
Delegate<C> delegate = event.context.getDelegate(struct);
struct.delegate(event.context, delegate);
}
}
class HeaderHandler<C> implements Handler<Event<C,Segment>> {
public void handle(Event<C,Segment> event) {
System.out.println("got header segment:\n " + event.target);
}
}
class BodyHandler<C> implements Handler<Event<C,Frame>> {
public void handle(Event<C,Frame> event) {
System.out.println("got body frame: " + event.target);
}
}
//====: Channel and Session Delegates :=======================================//
class ChannelDelegate extends Delegate<Channel> {
public @Override void session_open(Channel channel, SessionOpen open) {
Session ssn = new Session();
channel.session = ssn;
ssn.channel = channel;
System.out.println("Session Open");
}
}
class SessionDelegate extends Delegate<Session> {
public @Override void queue_declare(Session session, QueueDeclare qd) {
System.out.println("got a queue declare");
}
public @Override void exchange_declare(Session session, ExchangeDeclare ed)
{
System.out.println("got an exchange declare");
session.queue_declare("asdf", false);
}
public @Override void execution_result(Session session, ExecutionResult
result) {
Handler<Struct> handler = session.handlers.get(result.getCommandId());
if (handler != null) {
handler.handle(result.getData());
}
}
}
//====: Interface to Generated Code :=========================================//
interface DelegateContext<C> {
Delegate<C> getDelegate(Struct struct);
}
interface Delegator {
<C> void delegate(C context, Delegate<C> delegate);
}
interface Readable {
void read(ByteBuffer in);
}
interface Writable {
void write(ByteBuffer out);
}
interface Struct extends Delegator, Readable, Writable {}
interface Method extends Struct {}
interface Header extends Struct {}
abstract class AbstractMethod implements Method {}
//====: Generated Code :======================================================//
interface StructFactory {
Struct create(short type);
QueueDeclare create(Class<QueueDeclare> klass);
ExchangeDeclare create(Class<ExchangeDeclare> klass);
SessionOpen create(Class<SessionOpen> klass);
}
class StructFactory_v0_10 implements StructFactory {
public Struct create(short type) {
switch (type) {
case 0:
return new QueueDeclare_v0_10();
case 1:
return new ExchangeDeclare_v0_10();
case 2:
return new SessionOpen_v0_10();
default:
throw new IllegalArgumentException("unknown type: " + type);
}
}
public QueueDeclare create(Class<QueueDeclare> klass) {
return new QueueDeclare_v0_10();
}
public ExchangeDeclare create(Class<ExchangeDeclare> klass) {
return new ExchangeDeclare_v0_10();
}
public SessionOpen create(Class<SessionOpen> klass) {
return new SessionOpen_v0_10();
}
}
interface QueueDeclare extends Method {
void initialize(String name, boolean durable);
String getName();
boolean isDurable();
//...
}
class QueueDeclare_v0_10 extends AbstractMethod implements QueueDeclare {
private String name;
private boolean durable;
public void initialize(String name, boolean durable) {
this.name = name;
this.durable = durable;
}
public String getName() { return name; }
public boolean isDurable() { return durable; }
public <C> void delegate(C context, Delegate<C> delegate) {
delegate.queue_declare(context, this);
}
public void read(ByteBuffer in) {
// read in the field values
}
public void write(ByteBuffer out) {
// write out the field values
}
}
interface ExchangeDeclare extends Method {
String getName();
String getType();
boolean isDurable();
//...
}
class ExchangeDeclare_v0_10 extends AbstractMethod implements ExchangeDeclare {
private String name;
private String type;
private boolean durable;
public void initialize(String name, String type, boolean durable) {
this.name = name;
this.type = type;
this.durable = durable;
}
public String getName() {
return name;
}
public String getType() {
return type;
}
public boolean isDurable() {
return durable;
}
public <C> void delegate(C context, Delegate<C> delegate) {
delegate.exchange_declare(context, this);
}
public void read(ByteBuffer in) {
// read in the field values
}
public void write(ByteBuffer out) {
// write out the field values
}
}
interface ExecutionResult extends Method {
int getCommandId();
Struct getData();
}
class ExecutionResult_v0_10 extends AbstractMethod implements ExecutionResult {
private int command_id;
private Struct data;
public void initialize(int command_id, Struct data) {
this.command_id = command_id;
this.data = data;
}
public int getCommandId() {
return command_id;
}
public Struct getData() {
return data;
}
public <C> void delegate(C context, Delegate<C> delegate) {
delegate.execution_result(context, this);
}
public void read(ByteBuffer in) {
// read in the field values
}
public void write(ByteBuffer out) {
// write out the field values
}
}
interface SessionOpen extends Method {}
class SessionOpen_v0_10 extends AbstractMethod implements SessionOpen {
public void initialize() {}
public <C> void delegate(C context, Delegate<C> delegate) {
delegate.session_open(context, this);
}
public void read(ByteBuffer in) {
// read in the field values
}
public void write(ByteBuffer out) {
// write out the field values
}
}
abstract class Delegate<C> {
public void queue_declare(C context, QueueDeclare qd) {}
public void exchange_declare(C context, ExchangeDeclare ed) {}
public void execution_result(C context, ExecutionResult result) {}
public void session_open(C context, SessionOpen open) {}
}
interface Invoker {
void queue_declare(String name, boolean durable);
void queue_declare(Handler<Struct> handler, String name, boolean durable);
}
abstract class AbstractInvoker implements Invoker {
protected abstract void invoke(Method method);
protected abstract void invoke(Method method, Handler<Struct> handler);
protected abstract StructFactory getFactory();
public void queue_declare(String name, boolean durable) {
QueueDeclare qd = getFactory().create(QueueDeclare.class);
qd.initialize(name, durable);
invoke(qd);
}
public void queue_declare(Handler<Struct> handler, String name, boolean
durable) {
QueueDeclare qd = getFactory().create(QueueDeclare.class);
qd.initialize(name, durable);
invoke(qd, handler);
}
// ...
}