Author: gsim Date: Fri Oct 10 12:52:17 2014 New Revision: 1630822 URL: http://svn.apache.org/r1630822 Log: Added events: PN_CONNECTION_BOUND/UNBOUND, PN_TRANSPORT_HEAD_CLOSED, PN_TRANSPORT_TAIL_CLOSED, PN_TRANSPORT_CLOSED, and PN_TRANSPORT_ERROR. This should address PROTON-656
Added: qpid/proton/branches/examples/tests/python/proton_tests/scratch.py Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py qpid/proton/branches/examples/proton-c/include/proton/event.h qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h qpid/proton/branches/examples/proton-c/src/events/event.c qpid/proton/branches/examples/proton-c/src/messenger/messenger.c qpid/proton/branches/examples/proton-c/src/ssl/openssl.c qpid/proton/branches/examples/proton-c/src/transport/transport.c qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py qpid/proton/branches/examples/tests/python/proton_tests/engine.py Modified: qpid/proton/branches/examples/proton-c/bindings/python/proton.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/bindings/python/proton.py?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/bindings/python/proton.py (original) +++ qpid/proton/branches/examples/proton-c/bindings/python/proton.py Fri Oct 10 12:52:17 2014 @@ -3363,6 +3363,8 @@ class Collector: class Event: CONNECTION_INIT = PN_CONNECTION_INIT + CONNECTION_BOUND = PN_CONNECTION_BOUND + CONNECTION_UNBOUND = PN_CONNECTION_UNBOUND CONNECTION_OPEN = PN_CONNECTION_OPEN CONNECTION_CLOSE = PN_CONNECTION_CLOSE CONNECTION_REMOTE_OPEN = PN_CONNECTION_REMOTE_OPEN @@ -3385,7 +3387,12 @@ class Event: LINK_FINAL = PN_LINK_FINAL DELIVERY = PN_DELIVERY + TRANSPORT = PN_TRANSPORT + TRANSPORT_ERROR = PN_TRANSPORT_ERROR + TRANSPORT_HEAD_CLOSED = PN_TRANSPORT_HEAD_CLOSED + TRANSPORT_TAIL_CLOSED = PN_TRANSPORT_TAIL_CLOSED + TRANSPORT_CLOSED = PN_TRANSPORT_CLOSED def __init__(self, clazz, context, type): self.clazz = clazz Modified: qpid/proton/branches/examples/proton-c/include/proton/event.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/include/proton/event.h?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/include/proton/event.h (original) +++ qpid/proton/branches/examples/proton-c/include/proton/event.h Fri Oct 10 12:52:17 2014 @@ -91,130 +91,148 @@ typedef enum { * will ever be issued for a connection. Events of this type point * to the relevant connection. */ - PN_CONNECTION_INIT = 1, + PN_CONNECTION_INIT, + + /** + * The connection has been bound to a transport. + */ + PN_CONNECTION_BOUND, + + /** + * The connection has been unbound from its transport. + */ + PN_CONNECTION_UNBOUND, /** * The local connection endpoint has been closed. Events of this * type point to the relevant connection. */ - PN_CONNECTION_OPEN = 2, + PN_CONNECTION_OPEN, /** * The remote endpoint has opened the connection. Events of this * type point to the relevant connection. */ - PN_CONNECTION_REMOTE_OPEN = 3, + PN_CONNECTION_REMOTE_OPEN, /** * The local connection endpoint has been closed. Events of this * type point to the relevant connection. */ - PN_CONNECTION_CLOSE = 4, + PN_CONNECTION_CLOSE, /** * The remote endpoint has closed the connection. Events of this * type point to the relevant connection. */ - PN_CONNECTION_REMOTE_CLOSE = 5, + PN_CONNECTION_REMOTE_CLOSE, /** * The connection has been freed and any outstanding processing has * been completed. This is the final event that will ever be issued * for a connection. */ - PN_CONNECTION_FINAL = 6, + PN_CONNECTION_FINAL, /** * The session has been created. This is the first event that will * ever be issued for a session. */ - PN_SESSION_INIT = 11, + PN_SESSION_INIT, /** * The local session endpoint has been opened. Events of this type * point ot the relevant session. */ - PN_SESSION_OPEN = 12, + PN_SESSION_OPEN, /** * The remote endpoint has opened the session. Events of this type * point to the relevant session. */ - PN_SESSION_REMOTE_OPEN = 13, + PN_SESSION_REMOTE_OPEN, /** * The local session endpoint has been closed. Events of this type * point ot the relevant session. */ - PN_SESSION_CLOSE = 14, + PN_SESSION_CLOSE, /** * The remote endpoint has closed the session. Events of this type * point to the relevant session. */ - PN_SESSION_REMOTE_CLOSE = 15, + PN_SESSION_REMOTE_CLOSE, /** * The session has been freed and any outstanding processing has * been completed. This is the final event that will ever be issued * for a session. */ - PN_SESSION_FINAL = 16, + PN_SESSION_FINAL, /** * The link has been created. This is the first event that will ever * be issued for a link. */ - PN_LINK_INIT = 21, + PN_LINK_INIT, /** * The local link endpoint has been opened. Events of this type * point ot the relevant link. */ - PN_LINK_OPEN = 22, + PN_LINK_OPEN, /** * The remote endpoint has opened the link. Events of this type * point to the relevant link. */ - PN_LINK_REMOTE_OPEN = 23, + PN_LINK_REMOTE_OPEN, /** * The local link endpoint has been closed. Events of this type * point ot the relevant link. */ - PN_LINK_CLOSE = 24, + PN_LINK_CLOSE, /** * The remote endpoint has closed the link. Events of this type * point to the relevant link. */ - PN_LINK_REMOTE_CLOSE = 25, + PN_LINK_REMOTE_CLOSE, /** * The flow control state for a link has changed. Events of this * type point to the relevant link. */ - PN_LINK_FLOW = 26, + PN_LINK_FLOW, /** * The link has been freed and any outstanding processing has been * completed. This is the final event that will ever be issued for a * link. Events of this type point to the relevant link. */ - PN_LINK_FINAL = 27, + PN_LINK_FINAL, /** * A delivery has been created or updated. Events of this type point * to the relevant delivery. */ - PN_DELIVERY = 31, + PN_DELIVERY, /** * The transport has new data to read and/or write. Events of this * type point to the relevant transport. */ - PN_TRANSPORT = 41 + PN_TRANSPORT, + + PN_TRANSPORT_ERROR, + + PN_TRANSPORT_HEAD_CLOSED, + + PN_TRANSPORT_TAIL_CLOSED, + + PN_TRANSPORT_CLOSED } pn_event_type_t; Modified: qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h (original) +++ qpid/proton/branches/examples/proton-c/src/engine/engine-internal.h Fri Oct 10 12:52:17 2014 @@ -174,6 +174,8 @@ struct pn_transport_t { bool tail_closed; // input stream closed by driver bool head_closed; bool done_processing; // if true, don't call pn_process again + bool posted_head_closed; + bool posted_tail_closed; }; struct pn_connection_t { @@ -314,4 +316,7 @@ int pn_do_error(pn_transport_t *transpor void pn_session_unbound(pn_session_t* ssn); void pn_link_unbound(pn_link_t* link); +void pni_close_tail(pn_transport_t *transport); + + #endif /* engine-internal.h */ Modified: qpid/proton/branches/examples/proton-c/src/events/event.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/events/event.c?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/events/event.c (original) +++ qpid/proton/branches/examples/proton-c/src/events/event.c Fri Oct 10 12:52:17 2014 @@ -236,6 +236,10 @@ const char *pn_event_type_name(pn_event_ return "PN_EVENT_NONE"; case PN_CONNECTION_INIT: return "PN_CONNECTION_INIT"; + case PN_CONNECTION_BOUND: + return "PN_CONNECTION_BOUND"; + case PN_CONNECTION_UNBOUND: + return "PN_CONNECTION_UNBOUND"; case PN_CONNECTION_REMOTE_OPEN: return "PN_CONNECTION_REMOTE_OPEN"; case PN_CONNECTION_OPEN: @@ -276,6 +280,14 @@ const char *pn_event_type_name(pn_event_ return "PN_DELIVERY"; case PN_TRANSPORT: return "PN_TRANSPORT"; + case PN_TRANSPORT_ERROR: + return "PN_TRANSPORT_ERROR"; + case PN_TRANSPORT_HEAD_CLOSED: + return "PN_TRANSPORT_HEAD_CLOSED"; + case PN_TRANSPORT_TAIL_CLOSED: + return "PN_TRANSPORT_TAIL_CLOSED"; + case PN_TRANSPORT_CLOSED: + return "PN_TRANSPORT_CLOSED"; } return "<unrecognized>"; Modified: qpid/proton/branches/examples/proton-c/src/messenger/messenger.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/messenger/messenger.c?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/messenger/messenger.c (original) +++ qpid/proton/branches/examples/proton-c/src/messenger/messenger.c Fri Oct 10 12:52:17 2014 @@ -1259,10 +1259,18 @@ int pn_messenger_process_events(pn_messe pn_messenger_process_delivery(messenger, event); break; case PN_TRANSPORT: + case PN_TRANSPORT_ERROR: + case PN_TRANSPORT_HEAD_CLOSED: + case PN_TRANSPORT_TAIL_CLOSED: + case PN_TRANSPORT_CLOSED: pn_messenger_process_transport(messenger, event); break; case PN_EVENT_NONE: break; + case PN_CONNECTION_BOUND: + break; + case PN_CONNECTION_UNBOUND: + break; case PN_CONNECTION_FINAL: break; case PN_SESSION_FINAL: Modified: qpid/proton/branches/examples/proton-c/src/ssl/openssl.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/ssl/openssl.c?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/ssl/openssl.c (original) +++ qpid/proton/branches/examples/proton-c/src/ssl/openssl.c Fri Oct 10 12:52:17 2014 @@ -197,7 +197,7 @@ static int ssl_failed(pn_ssl_t *ssl) ERR_error_string_n( ssl_err, buf, sizeof(buf) ); } _log_ssl_error(NULL); // spit out any remaining errors to the log file - ssl->transport->tail_closed = true; + pni_close_tail(ssl->transport); pn_do_error(ssl->transport, "amqp:connection:framing-error", "SSL Failure: %s", buf); return PN_EOS; } Modified: qpid/proton/branches/examples/proton-c/src/transport/transport.c URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-c/src/transport/transport.c?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-c/src/transport/transport.c (original) +++ qpid/proton/branches/examples/proton-c/src/transport/transport.c Fri Oct 10 12:52:17 2014 @@ -174,6 +174,9 @@ static void pn_transport_initialize(void transport->output_pending = 0; transport->done_processing = false; + + transport->posted_head_closed = false; + transport->posted_tail_closed = false; } pn_session_t *pn_channel_state(pn_transport_t *transport, uint16_t channel) @@ -262,11 +265,17 @@ static void pn_transport_finalize(void * int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) { - if (!transport) return PN_ARG_ERR; + assert(transport); + assert(connection); + if (transport->connection) return PN_STATE_ERR; if (connection->transport) return PN_STATE_ERR; + transport->connection = connection; connection->transport = transport; + + pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); + pn_incref(connection); if (transport->open_rcvd) { PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); @@ -274,6 +283,7 @@ int pn_transport_bind(pn_transport_t *tr transport->disp->halt = false; transport_consume(transport); // blech - testBindAfterOpen } + return 0; } @@ -304,9 +314,12 @@ int pn_transport_unbind(pn_transport_t * assert(transport); if (!transport->connection) return 0; + pn_connection_t *conn = transport->connection; transport->connection = NULL; + pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND); + // XXX: what happens if the endpoints are freed before we get here? pn_session_t *ssn = pn_session_head(conn, 0); while (ssn) { @@ -415,6 +428,15 @@ int pn_post_close(pn_transport_t *transp (bool) condition, ERROR, condition, description, info); } +static pn_collector_t *pni_transport_collector(pn_transport_t *transport) +{ + if (transport->connection && transport->connection->collector) { + return transport->connection->collector; + } else { + return NULL; + } +} + int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...) { va_list ap; @@ -433,6 +455,8 @@ int pn_do_error(pn_transport_t *transpor } transport->disp->halt = true; pn_transport_logf(transport, "ERROR %s %s", condition, buf); + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR); return PN_ERR; } @@ -999,6 +1023,14 @@ ssize_t pn_transport_input(pn_transport_ return original - available; } +static void pni_maybe_post_closed(pn_transport_t *transport) +{ + pn_collector_t *collector = pni_transport_collector(transport); + if (transport->posted_head_closed && transport->posted_tail_closed) { + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED); + } +} + // process pending input until none remaining or EOS static ssize_t transport_consume(pn_transport_t *transport) { @@ -1020,6 +1052,12 @@ static ssize_t transport_consume(pn_tran if (transport->disp->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) pn_transport_log(transport, " <- EOS"); transport->input_pending = 0; // XXX ??? + if (!transport->posted_tail_closed) { + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED); + transport->posted_tail_closed = true; + pni_maybe_post_closed(transport); + } return n; } } @@ -2041,6 +2079,13 @@ ssize_t pn_transport_push(pn_transport_t } } +void pni_close_tail(pn_transport_t *transport) +{ + if (!transport->tail_closed) { + transport->tail_closed = true; + } +} + int pn_transport_process(pn_transport_t *transport, size_t size) { assert(transport); @@ -2050,7 +2095,7 @@ int pn_transport_process(pn_transport_t ssize_t n = transport_consume( transport ); if (n == PN_EOS) { - transport->tail_closed = true; + pni_close_tail(transport); } if (n < 0 && n != PN_EOS) return n; @@ -2060,7 +2105,7 @@ int pn_transport_process(pn_transport_t // input stream has closed int pn_transport_close_tail(pn_transport_t *transport) { - transport->tail_closed = true; + pni_close_tail(transport); transport_consume( transport ); return 0; // XXX: what if not all input processed at this point? do we care??? @@ -2112,6 +2157,14 @@ void pn_transport_pop(pn_transport_t *tr memmove( transport->output_buf, &transport->output_buf[size], transport->output_pending ); } + + if (!transport->output_pending && pn_transport_pending(transport) < 0 && + !transport->posted_head_closed) { + pn_collector_t *collector = pni_transport_collector(transport); + pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_HEAD_CLOSED); + transport->posted_head_closed = true; + pni_maybe_post_closed(transport); + } } } Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java Fri Oct 10 12:52:17 2014 @@ -28,56 +28,40 @@ package org.apache.qpid.proton.engine; public interface Event { - public enum Category { - CONNECTION, - SESSION, - LINK, - DELIVERY, - TRANSPORT; - } public enum Type { - CONNECTION_INIT(Category.CONNECTION, 1), - CONNECTION_OPEN(Category.CONNECTION, 2), - CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3), - CONNECTION_CLOSE(Category.CONNECTION, 4), - CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5), - CONNECTION_FINAL(Category.CONNECTION, 6), - - SESSION_INIT(Category.SESSION, 1), - SESSION_OPEN(Category.SESSION, 2), - SESSION_REMOTE_OPEN(Category.SESSION, 3), - SESSION_CLOSE(Category.SESSION, 4), - SESSION_REMOTE_CLOSE(Category.SESSION, 5), - SESSION_FINAL(Category.SESSION, 6), - - LINK_INIT(Category.LINK, 1), - LINK_OPEN(Category.LINK, 2), - LINK_REMOTE_OPEN(Category.LINK, 3), - LINK_CLOSE(Category.LINK, 4), - LINK_REMOTE_CLOSE(Category.LINK, 5), - LINK_FLOW(Category.LINK, 6), - LINK_FINAL(Category.LINK, 7), - - DELIVERY(Category.DELIVERY, 1), - TRANSPORT(Category.TRANSPORT, 1); - - private int _opcode; - private Category _category; - - private Type(Category c, int o) - { - this._category = c; - this._opcode = o; - } - - public Category getCategory() - { - return this._category; - } - } + CONNECTION_INIT, + CONNECTION_BOUND, + CONNECTION_UNBOUND, + CONNECTION_OPEN, + CONNECTION_REMOTE_OPEN, + CONNECTION_CLOSE, + CONNECTION_REMOTE_CLOSE, + CONNECTION_FINAL, + + SESSION_INIT, + SESSION_OPEN, + SESSION_REMOTE_OPEN, + SESSION_CLOSE, + SESSION_REMOTE_CLOSE, + SESSION_FINAL, + + LINK_INIT, + LINK_OPEN, + LINK_REMOTE_OPEN, + LINK_CLOSE, + LINK_REMOTE_CLOSE, + LINK_FLOW, + LINK_FINAL, - Category getCategory(); + DELIVERY, + + TRANSPORT, + TRANSPORT_ERROR, + TRANSPORT_HEAD_CLOSED, + TRANSPORT_TAIL_CLOSED, + TRANSPORT_CLOSED + } Type getType(); Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java Fri Oct 10 12:52:17 2014 @@ -56,11 +56,6 @@ class EventImpl implements Event context = null; } - public Category getCategory() - { - return type.getCategory(); - } - public Type getType() { return type; @@ -73,16 +68,15 @@ class EventImpl implements Event public Connection getConnection() { - switch (type.getCategory()) { - case CONNECTION: + if (context instanceof Connection) { return (Connection) context; - case TRANSPORT: + } else if (context instanceof Transport) { Transport transport = getTransport(); if (transport == null) { return null; } return ((TransportImpl) transport).getConnectionImpl(); - default: + } else { Session ssn = getSession(); if (ssn == null) { return null; @@ -93,10 +87,9 @@ class EventImpl implements Event public Session getSession() { - switch (type.getCategory()) { - case SESSION: + if (context instanceof Session) { return (Session) context; - default: + } else { Link link = getLink(); if (link == null) { return null; @@ -107,10 +100,9 @@ class EventImpl implements Event public Link getLink() { - switch (type.getCategory()) { - case LINK: + if (context instanceof Link) { return (Link) context; - default: + } else { Delivery dlv = getDelivery(); if (dlv == null) { return null; @@ -121,20 +113,18 @@ class EventImpl implements Event public Delivery getDelivery() { - switch (type.getCategory()) { - case DELIVERY: + if (context instanceof Delivery) { return (Delivery) context; - default: + } else { return null; } } public Transport getTransport() { - switch (type.getCategory()) { - case TRANSPORT: + if (context instanceof Transport) { return (Transport) context; - default: + } else { return null; } } @@ -150,4 +140,5 @@ class EventImpl implements Event { return "EventImpl{" + "type=" + type + ", context=" + context + '}'; } + } Modified: qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java (original) +++ qpid/proton/branches/examples/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java Fri Oct 10 12:52:17 2014 @@ -123,6 +123,9 @@ public class TransportImpl extends Endpo private boolean _head_closed = false; private TransportException _tail_error = null; + private boolean postedHeadClosed = false; + private boolean postedTailClosed = false; + /** * @deprecated This constructor's visibility will be reduced to the default scope in a future release. * Client code outside this module should use a {@link EngineFactory} instead @@ -210,8 +213,10 @@ public class TransportImpl extends Endpo @Override public void bind(Connection conn) { - _connectionEndpoint = (ConnectionImpl) conn; // TODO - check if already bound + + _connectionEndpoint = (ConnectionImpl) conn; + put(Event.Type.CONNECTION_BOUND, conn); _connectionEndpoint.setTransport(this); _connectionEndpoint.incref(); @@ -230,6 +235,7 @@ public class TransportImpl extends Endpo @Override public void unbind() { + put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint); _connectionEndpoint.modifyEndpoints(); _connectionEndpoint.setTransport(null); @@ -1236,6 +1242,19 @@ public class TransportImpl extends Endpo return _closeReceived; } + void put(Event.Type type, Object context) { + if (_connectionEndpoint != null) { + _connectionEndpoint.put(type, context); + } + } + + private void maybePostClosed() + { + if (postedHeadClosed && postedTailClosed) { + put(Event.Type.TRANSPORT_CLOSED, this); + } + } + @Override public void closed(TransportException error) { @@ -1247,6 +1266,14 @@ public class TransportImpl extends Endpo } _head_closed = true; } + if (_tail_error != null) { + put(Event.Type.TRANSPORT_ERROR, this); + } + if (!postedTailClosed) { + put(Event.Type.TRANSPORT_TAIL_CLOSED, this); + postedTailClosed = true; + maybePostClosed(); + } } @Override @@ -1351,6 +1378,13 @@ public class TransportImpl extends Endpo { init(); _outputProcessor.pop(bytes); + + int p = pending(); + if (p < 0 && !postedHeadClosed) { + put(Event.Type.TRANSPORT_HEAD_CLOSED, this); + postedHeadClosed = true; + maybePostClosed(); + } } @Override Modified: qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py (original) +++ qpid/proton/branches/examples/proton-j/src/main/resources/cengine.py Fri Oct 10 12:52:17 2014 @@ -942,13 +942,9 @@ def pn_transport_closed(trans): from org.apache.qpid.proton.engine import Event -PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION -PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION -PN_EVENT_CATEGORY_LINK = Event.Category.LINK -PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY -PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT - PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT +PN_CONNECTION_BOUND = Event.Type.CONNECTION_BOUND +PN_CONNECTION_UNBOUND = Event.Type.CONNECTION_UNBOUND PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE @@ -969,6 +965,10 @@ PN_LINK_FLOW = Event.Type.LINK_FLOW PN_LINK_FINAL = Event.Type.LINK_FINAL PN_DELIVERY = Event.Type.DELIVERY PN_TRANSPORT = Event.Type.TRANSPORT +PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR +PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED +PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED +PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED def pn_collector(): return Proton.collector() Modified: qpid/proton/branches/examples/tests/python/proton_tests/engine.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/engine.py?rev=1630822&r1=1630821&r2=1630822&view=diff ============================================================================== --- qpid/proton/branches/examples/tests/python/proton_tests/engine.py (original) +++ qpid/proton/branches/examples/tests/python/proton_tests/engine.py Fri Oct 10 12:52:17 2014 @@ -2150,8 +2150,8 @@ class EventTest(CollectorTest): self.pump() c1.free() c1._transport.unbind() - self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL, - Event.CONNECTION_FINAL) + self.expect(Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL, + Event.SESSION_FINAL, Event.CONNECTION_FINAL) def testConnectionINIT_FINAL(self): c = Connection() @@ -2215,8 +2215,8 @@ class EventTest(CollectorTest): self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY) rcv.session.connection._transport.unbind() rcv.session.connection.free() - self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL, - Event.CONNECTION_FINAL) + self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL, + Event.SESSION_FINAL, Event.CONNECTION_FINAL) def testDeliveryEventsDisp(self): snd, rcv = self.testFlowEvents() @@ -2235,6 +2235,60 @@ class EventTest(CollectorTest): event = self.expect(Event.DELIVERY) assert event.context == dlv + def testConnectionBOUND_UNBOUND(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + self.expect(Event.CONNECTION_BOUND) + t.unbind() + self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT) + + def testTransportERROR_CLOSE(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + self.expect(Event.CONNECTION_BOUND) + t.push("asdf") + self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED) + p = t.pending() + assert p > 0 + # XXX: can't include this because java behaviour is different + #assert "AMQP header mismatch" in t.peek(p), repr(t.peek(p)) + t.pop(p) + self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) + + def testTransportCLOSED(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + c.open() + + self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_OPEN, Event.TRANSPORT) + + c2 = Connection() + t2 = Transport() + t2.bind(c2) + c2.open() + c2.close() + + pump(t, t2) + + self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE, + Event.TRANSPORT_TAIL_CLOSED) + + c.close() + + pump(t, t2) + + self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT, + Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) + class PeerTest(CollectorTest): def setup(self): @@ -2255,7 +2309,8 @@ class TeardownLeakTest(PeerTest): def doLeak(self, local, remote): self.connection.open() - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT) + self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, + Event.CONNECTION_OPEN, Event.TRANSPORT) ssn = self.connection.session() ssn.open() @@ -2294,19 +2349,23 @@ class TeardownLeakTest(PeerTest): self.pump() if remote: - self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE, - Event.CONNECTION_REMOTE_CLOSE), - (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL, - Event.SESSION_REMOTE_CLOSE, - Event.CONNECTION_REMOTE_CLOSE)) + self.expect_oneof((Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE, + Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE, + Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED), + (Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE, + Event.LINK_FINAL, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED, + Event.TRANSPORT_CLOSED)) else: - self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE) + self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED, + Event.TRANSPORT_CLOSED) self.connection.free() self.transport.unbind() - self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL), - (Event.SESSION_FINAL, Event.CONNECTION_FINAL)) + self.expect_oneof((Event.LINK_FINAL, Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.CONNECTION_FINAL), + (Event.CONNECTION_UNBOUND, Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL)) def testLocalRemoteLeak(self): self.doLeak(True, True) Added: qpid/proton/branches/examples/tests/python/proton_tests/scratch.py URL: http://svn.apache.org/viewvc/qpid/proton/branches/examples/tests/python/proton_tests/scratch.py?rev=1630822&view=auto ============================================================================== --- qpid/proton/branches/examples/tests/python/proton_tests/scratch.py (added) +++ qpid/proton/branches/examples/tests/python/proton_tests/scratch.py Fri Oct 10 12:52:17 2014 @@ -0,0 +1,44 @@ + def xxx_test_reopen_on_same_session(self): + ssn1 = self.snd.session + ssn2 = self.rcv.session + + self.snd.open() + self.rcv.open() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + + self.snd.close() + self.rcv.close() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED + assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED + + print self.snd._link + self.snd = ssn1.sender("test-link") + print self.snd._link + self.rcv = ssn2.receiver("test-link") + self.snd.open() + self.rcv.open() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + +class SessionPipelineTest(PeerTest): + + def xxx_test(self): + self.connection.open() + self.peer.open() + self.pump() + ssn = self.connection.session() + ssn.open() + self.pump() + peer_ssn = self.peer.session_head(0) + ssn.close() + self.pump() + peer_ssn.close() + self.peer.close() + self.pump() --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org