pier 01/07/19 17:00:54 Modified: catalina/src/share/org/apache/catalina/connector/warp WarpConnection.java Log: New WARP implementation from Jakarta-Tomcat-Connectors Revision Changes Path 1.8 +137 -260 jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/connector/warp/WarpConnection.java Index: WarpConnection.java =================================================================== RCS file: /home/cvs/jakarta-tomcat-4.0/catalina/src/share/org/apache/catalina/connector/warp/WarpConnection.java,v retrieving revision 1.7 retrieving revision 1.8 diff -u -r1.7 -r1.8 --- WarpConnection.java 2001/01/24 23:10:42 1.7 +++ WarpConnection.java 2001/07/20 00:00:53 1.8 @@ -2,7 +2,7 @@ * * * The Apache Software License, Version 1.1 * * * - * Copyright (c) 1999, 2000 The Apache Software Foundation. * + * Copyright (c) 1999-2001 The Apache Software Foundation. * * All rights reserved. * * * * ========================================================================= * @@ -56,326 +56,203 @@ * ========================================================================= */ package org.apache.catalina.connector.warp; -import java.io.*; -import java.net.*; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; + import org.apache.catalina.Lifecycle; import org.apache.catalina.LifecycleEvent; -import org.apache.catalina.LifecycleException; import org.apache.catalina.LifecycleListener; -import org.apache.catalina.util.LifecycleSupport; + +public class WarpConnection implements LifecycleListener, Runnable { -/** - * - * @author <a href="mailto:[EMAIL PROTECTED]">Pier Fumagalli</a> - * @author Copyright © 1999, 2000 <a href="http://www.apache.org">The - * Apache Software Foundation. - * @version CVS $Id: WarpConnection.java,v 1.7 2001/01/24 23:10:42 pier Exp $ - */ -public class WarpConnection implements Lifecycle, Runnable { - - // -------------------------------------------------------------- CONSTANTS - - /** Our debug flag status (Used to compile out debugging information). */ - private static final boolean DEBUG=WarpDebug.DEBUG; - - // -------------------------------------------------------- LOCAL VARIABLES - - /** The lifecycle event support for this component. */ - private LifecycleSupport lifecycle=null; - /** The WarpHandlerTable contains the list of all current handlers. */ - private WarpHandlerTable table=null; - /** The name of this connection. */ - private String name=null; - /** Wether we started or not. */ + /* ==================================================================== */ + /* Instance variables */ + /* ==================================================================== */ + + /* -------------------------------------------------------------------- */ + /* Local variables */ + + /** Our socket input stream. */ + private InputStream input=null; + /** Our socket output stream. */ + private OutputStream output=null; + /** The started flag. */ private boolean started=false; - /** The number of active connections. */ - private static int num=0; + /** The local thread. */ + private Thread thread=null; + /** Our logger. */ + private WarpLogger logger=null; - // -------------------------------------------------------- BEAN PROPERTIES + /* -------------------------------------------------------------------- */ + /* Bean variables */ - /** The socket used in this connection. */ + /** The socket this connection is working on. */ private Socket socket=null; - /** The connector wich created this connection. */ + /** The connector instance. */ private WarpConnector connector=null; - // ------------------------------------------------------------ CONSTRUCTOR + /* ==================================================================== */ + /* Constructor */ + /* ==================================================================== */ /** - * Create a new WarpConnection instance. + * Construct a new instance of a <code>WarpConnection</code>. */ public WarpConnection() { super(); - this.lifecycle=new LifecycleSupport(this); - this.table=new WarpHandlerTable(); - if (DEBUG) this.debug("New instance created"); + this.logger=new WarpLogger(this); } - // --------------------------------------------------------- PUBLIC METHODS + /* ==================================================================== */ + /* Bean methods */ + /* ==================================================================== */ /** - * Run the thread waiting on the socket, reading packets from the client - * and dispatching them to the appropriate handler. + * Set the socket this connection is working on. */ - public void run() { - WarpHandler han=null; - InputStream in=null; - int rid=0; - int typ=0; - int len=0; - int ret=0; - int b1=0; - int b2=0; - byte buf[]=null; - - // Log the connection opening - num++; - if (DEBUG) this.debug("Connection started (num="+num+") "+this.name); - - try { - // Open the socket InputStream - in=this.socket.getInputStream(); - - // Read packets - while(this.started) { - // RID number - b1=in.read(); - b2=in.read(); - if ((b1 | b2)==-1) { - this.log("Premature RID end"); - break; - } - rid=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff)); - // Packet type - b1=in.read(); - b2=in.read(); - if ((b1 | b2)==-1) { - this.log("Premature TYPE end"); - break; - } - typ=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff)); - // Packet payload length - b1=in.read(); - b2=in.read(); - if ((b1 | b2)==-1) { - this.log("Premature LEN end"); - break; - } - len=(((b1 & 0x0ff) << 8) | (b2 & 0x0ff)); - // Packet payload - buf=new byte[len]; - if ((ret=in.read(buf,0,len))!=len) { - this.log("Premature packet end"+" ("+ret+" of "+len+")"); - break; - } - - if (DEBUG) this.debug("Received packet RID="+rid+" TYP="+typ); - - // Check if we have the special RID 0x0ffff (disconnect) - if (rid==0x0ffff) { - this.log("Connection closing ("+new String(buf)+")"); - break; - } - - // Dispatch packet - synchronized (this) { han=this.table.get(rid); } - if (han==null) { - this.log("Handler for RID "+rid+" not found"); - break; - } - han.processData(typ,buf); - } - } catch (IOException e) { - if (this.started) e.printStackTrace(System.err); - } - - // Close this connection before terminating the thread - try { - this.stop(); - } catch (LifecycleException e) { - this.log(e); - } - num--; - if (DEBUG) this.debug("Connection ended (num="+num+") "+this.name); - } - - /** - * Initialize this connection. - * - * @param sock The socket used by this connection to transfer data. - */ - public void start() - throws LifecycleException { - // Paranoia checks. - if (this.socket==null) - throw new LifecycleException("Null socket"); - if (this.connector==null) - throw new LifecycleException("Null connector"); - - // Register the WarpConnectionHandler for RID=0 (connection) - this.started=true; - WarpHandler h=new WarpConnectionHandler(); - h.setConnection(this); - h.setRequestID(0); - h.start(); - // Paranoia check - if(this.registerHandler(h,0)!=true) { - this.stop(); - throw new LifecycleException("Cannot register connection handler"); - } - // Set the thread and connection name and start the thread - this.name=this.socket.getInetAddress().getHostAddress(); - this.name=this.name+":"+this.socket.getPort(); - new Thread(this,name).start(); + public void setSocket(Socket socket) { + this.socket=socket; } /** - * Send a WARP packet. + * Return the socket this connection is working on. */ - public void send(int rid, int type, byte buffer[], int offset, int len) - throws IOException { - if (this.socket==null) throw new IOException("Connection closed "+type); - OutputStream out=this.socket.getOutputStream(); - byte hdr[]=new byte[6]; - // Set the RID number - hdr[0]=(byte)((rid>>8)&0x0ff); - hdr[1]=(byte)(rid&0x0ff); - // Set the TYPE - hdr[2]=(byte)((type>>8)&0x0ff); - hdr[3]=(byte)(type&0x0ff); - // Set the payload length - hdr[4]=(byte)((len>>8)&0x0ff); - hdr[5]=(byte)(len&0x0ff); - // Send the header and payload - synchronized(this) { - out.write(hdr,0,6); - out.write(buffer,offset,len); - out.flush(); - } - if (DEBUG) this.debug("Sending packet RID="+rid+" TYP="+type); + public Socket getSocket() { + return(this.socket); } /** - * Close this connection. + * Set the <code>WarpConnector</code> associated with this connection. */ - public void stop() - throws LifecycleException { - this.started=false; - // Stop all handlers - WarpHandler handlers[]=this.table.handlers(); - for (int x=0; x<handlers.length; x++) handlers[x].stop(); - // Close the socket (this will make the thread exit) - if (this.socket!=null) try { - this.socket.close(); - } catch (IOException e) { - this.log(e); - throw new LifecycleException("Closing connection "+this.name,e); - } - - this.socket=null; - // Log this step - this.log("Connection closed"); - } - - /** - * Add a WarpHandler to this connection. - * - * @param han The WarpHandler add to this connection. - * @param rid The RID number associated with the WarpHandler. - * @return If another WarpHandler is associated with this RID return - * false, otherwise return true. - */ - protected synchronized boolean registerHandler(WarpHandler han, int rid) { - if (DEBUG) this.debug("Registering handler for RID "+rid); - return(this.table.add(han, rid)); + public void setConnector(WarpConnector connector) { + this.connector=connector; + this.logger.setContainer(connector.getContainer()); } /** - * Remove a WarpHandler from this connection. - * - * @param rid The RID number associated with the WarpHandler to remove. - * @return The old WarpHandler associated with the specified RID or null. + * Return the <code>WarpConnector</code> associated with this connection. */ - protected synchronized WarpHandler removeHandler(int rid) { - return(this.table.remove(rid)); + public WarpConnector getConnector() { + return(this.connector); } - // ----------------------------------------------------------- BEAN METHODS + /* ==================================================================== */ + /* Lifecycle methods */ + /* ==================================================================== */ /** - * Return the socket associated with this connection. + * Get notified of events in the connector. */ - protected WarpConnector getConnector() { - return(this.connector); + public void lifecycleEvent(LifecycleEvent event) { + if (Lifecycle.STOP_EVENT.equals(event.getType())) this.stop(); } /** - * Set the socket used by this connection. + * Start working on this connection. */ - protected void setConnector(WarpConnector connector) { - if (DEBUG) this.debug("Setting connector"); - this.connector=connector; + public void start() { + synchronized(this) { + this.started=true; + this.thread=new Thread(this); + this.thread.start(); + } } /** - * Return the socket associated with this connection. + * Stop all we're doing on the connection. */ - protected Socket getSocket() { - return(this.socket); + public void stop() { + synchronized(this) { + try { + this.started=false; + this.socket.close(); + this.getConnector().removeLifecycleListener(this); + } catch (IOException e) { + logger.log("Cannot close socket",e); + } + } } /** - * Set the socket used by this connection. + * Process data from the socket. */ - protected void setSocket(Socket socket) { - if (DEBUG) this.debug("Setting socket"); - this.socket=socket; - } + public void run() { + WarpPacket packet=new WarpPacket(); - // ------------------------------------------------------ LIFECYCLE METHODS + if (Constants.DEBUG) logger.debug("Connection starting"); - /** - * Add a lifecycle event listener to this component. - */ - public void addLifecycleListener(LifecycleListener listener) { - this.lifecycle.addLifecycleListener(listener); - } + try { + this.input=this.socket.getInputStream(); + this.output=this.socket.getOutputStream(); + if (!new WarpConfigurationHandler().handle(this,packet)) { + logger.log("Configuration handler returned false"); + this.stop(); + } + WarpRequestHandler requestHandler=new WarpRequestHandler(); + while (requestHandler.handle(this,packet)); + this.stop(); + } catch (IOException e) { + logger.log("Exception on socket",e); + } - /** - * Remove a lifecycle event listener from this component. - */ - public void removeLifecycleListener(LifecycleListener listener) { - lifecycle.removeLifecycleListener(listener); + if (Constants.DEBUG) logger.debug("Connection terminated"); } - - // ------------------------------------------ LOGGING AND DEBUGGING METHODS - /** - * Dump a log message. - */ - public void log(String msg) { - if (this.connector!=null) this.connector.log(msg); - else WarpDebug.debug(this,msg); - } + /* ==================================================================== */ + /* Public methods */ + /* ==================================================================== */ /** - * Dump information for an Exception. + * Send a WARP packet over this connection. */ - public void log(Exception exc) { - if (this.connector!=null) this.connector.log(exc); - else WarpDebug.debug(this,exc); - } + public void send(WarpPacket packet) + throws IOException { + if (Constants.DEBUG) { + logger.debug(">> TYPE="+packet.getType()+" LENGTH="+packet.size); + logger.debug(">> "+packet.dump()); + } - /** - * Dump a debug message. - */ - private void debug(String msg) { - if (DEBUG) WarpDebug.debug(this,msg); + this.output.write(packet.getType()&0x0ff); + this.output.write((packet.size>>8)&0x0ff); + this.output.write((packet.size>>0)&0x0ff); + this.output.write(packet.buffer,0,packet.size); + this.output.flush(); + packet.reset(); } /** - * Dump information for an Exception. + * Receive a WARP packet over this connection. */ - private void debug(Exception exc) { - if (DEBUG) WarpDebug.debug(this,exc); + public void recv(WarpPacket packet) + throws IOException { + int t=this.input.read(); + int l1=this.input.read(); + int l2=this.input.read(); + + if ((t|l1|l2)==-1) + throw new IOException("Premature packet header end"); + + packet.reset(); + packet.setType(t&0x0ff); + packet.size=(( l1 & 0x0ff ) << 8) | ( l2 & 0x0ff ); + + if (packet.size>0) { + int off=0; + int ret=0; + while (true) { + ret=this.input.read(packet.buffer,off,packet.size-off); + if (ret==-1) + throw new IOException("Premature packet payload end"); + off+=ret; + if(off==packet.size) break; + } + } + + if (Constants.DEBUG) { + logger.debug("<< TYPE="+packet.getType()+" LENGTH="+packet.size); + logger.debug("<< "+packet.dump()); + } } }