Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java?view=diff&rev=523854&r1=523853&r2=523854 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java (original) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/TransportPhase.java Thu Mar 29 15:24:20 2007 @@ -108,7 +108,6 @@ public void messageSent(Object frame) throws AMQPException { - _ioSession.write(frame); } @@ -120,7 +119,7 @@ public void sessionIdle(IoSession session, IdleStatus status) throws Exception { - _logger.debug("Protocol Session [" + this + ":" + session + "] idle: " + _logger.debug("Protocol Session for [ " + this + " : " + session + "] idle: " + status); if (IdleStatus.WRITER_IDLE.equals(status)) { @@ -148,9 +147,8 @@ _logger.debug("Received heartbeat"); } else { - messageReceived(bodyFrame); - } - // _connection.bytesReceived(_protocolSession.getIoSession().getReadBytes()); + messageReceived(frame); + } } public void messageSent(IoSession session, Object message) throws Exception @@ -162,18 +160,19 @@ throws Exception { // Need to handle failover - sessionClosed(session); + _logger.info("Exception caught for [ " + this + " : Session " + System.identityHashCode(session) + "]",cause); + //sessionClosed(session); } public void sessionClosed(IoSession session) throws Exception { // Need to handle failover - _logger.info("Protocol Session [" + this + "] closed"); + _logger.info("Protocol Session for [ " + this + " : " + System.identityHashCode(session) + "] closed"); } public void sessionCreated(IoSession session) throws Exception { - _logger.debug("Protocol session created for session " + _logger.info("Protocol session created for " + this + " session : " + System.identityHashCode(session)); final ProtocolCodecFilter pcf = new ProtocolCodecFilter( @@ -184,7 +183,8 @@ { session.getFilterChain().addBefore("AsynchronousWriteFilter", "protocolFilter", pcf); - } else + } + else { session.getFilterChain().addLast("protocolFilter", pcf); } @@ -213,12 +213,13 @@ e.printStackTrace(); } + _ioSession = session; doAMQPConnectionNegotiation(); } public void sessionOpened(IoSession session) throws Exception { - _logger.debug("Protocol session opened for session " + _logger.info("Protocol session opened for " + this + " : session " + System.identityHashCode(session)); } @@ -230,6 +231,7 @@ private void doAMQPConnectionNegotiation() { int i = pv.length - 1; + _logger.debug("Engaging in connection negotiation"); writeFrame(new ProtocolInitiation(pv[i][PROTOCOL_MAJOR], pv[i][PROTOCOL_MINOR])); } @@ -257,7 +259,8 @@ } /** - * ----------------------------------------------------------- Failover - * section ----------------------------------------------------------- + * ----------------------------------------------------------- + * Failover section + * ----------------------------------------------------------- */ }
Modified: incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java?view=diff&rev=523854&r1=523853&r2=523854 ============================================================================== --- incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java (original) +++ incubator/qpid/branches/qpid.0-9/java/newclient/src/main/java/org/apache/qpid/nclient/transport/VMConnection.java Thu Mar 29 15:24:20 2007 @@ -26,10 +26,13 @@ private BrokerDetails _brokerDetails; private IoConnector _ioConnector; private Phase _phase; + private PhaseContext _ctx; - protected VMConnection(ConnectionURL url) + protected VMConnection(ConnectionURL url,PhaseContext ctx) { _brokerDetails = url.getBrokerDetails(0); + _ctx = ctx; + _ioConnector = new VmPipeConnector(); final IoServiceConfig cfg = _ioConnector.getDefaultConfig(); ReferenceCountingExecutorService executorService = ReferenceCountingExecutorService.getInstance(); @@ -45,11 +48,10 @@ { createVMBroker(); - PhaseContext ctx = new DefaultPhaseContext(); - ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); - ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); + _ctx.setProperty(QpidConstants.AMQP_BROKER_DETAILS,_brokerDetails); + _ctx.setProperty(QpidConstants.MINA_IO_CONNECTOR,_ioConnector); - _phase = PhaseFactory.createPhasePipe(ctx); + _phase = PhaseFactory.createPhasePipe(_ctx); _phase.start(); return _phase;
