Forget it, here is the code.  So sorry for so many emails.

package org.apache.servicemix.components.tcp;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;

import javax.jbi.JBIException;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.transform.TransformerException;

import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.components.util.ComponentSupport;
import org.springframework.beans.factory.InitializingBean;

public abstract class TcpComponentSupport extends ComponentSupport
implements
        MessageExchangeListener, InitializingBean {
    /**
     * Logger for this class
     */
    private static final Logger log = Logger
            .getLogger(TcpComponentSupport.class);
    
    private String host;
    private int port = -1;
    private boolean useNIO = false;
    private int retryInterval = 10000;
    protected SocketChannel socketChannel = null;
    private Socket socket = null;
    protected InputStream in = null;
    protected OutputStream out = null;
    protected boolean running = false;
    private TcpMarshaler marshaler = new TcpMarshaler();
    
    protected abstract void processConnection() throws JBIException,
IOException;
    protected abstract void processIncomingMessage(String tcpMessage);
    
    public void setHost(String host) {
        this.host = host;
    } // en dof setHost(String)(
    
    public void setPort(int port) {
        this.port = port;
    } // en dof setPort(int)
    
    public void setUseNIO(boolean useNIO) {
        this.useNIO = useNIO;
    } // end of setUserNIO(boolean)
    
    public void setRetryInterval(int retryInterval) {
        this.retryInterval = retryInterval;
    } // end of setRetryInterval(int)
    
    protected boolean getUseNIO() {
        return this.useNIO;
    } // end of getUseNIO()
    
    protected String getHost() {
        return this.host;
    } // en dof getHost()
    
    protected int getPort() {
        return this.port;
    } // en dof getPort()

    public void onMessageExchange(MessageExchange exchange)
            throws MessagingException {
        String tcpMessage = null;
        try {
            tcpMessage = this.marshaler.fromNMS(exchange.getMessage("in"));
            this.processIncomingMessage(tcpMessage);
            this.done(exchange);
        } catch (TransformerException e) {
            throw new MessagingException(e);
        } // end of try /catch
    } // end of onMessageExchange(MessageExchange)

    public void afterPropertiesSet() throws Exception {
        if (host == null) {
            throw new IllegalArgumentException("You must specify the host
property.");
        } // end of if
        
        if (this.port < 0) {
            throw new IllegalArgumentException("You must specify the port
property.");
        } // end of if
    } // end of afterPropertiesSet()
    
    public void start() throws JBIException {
        if (this.useNIO) {
            // configure the NIO SocketChannel
            this.startNIO();
        } else {
            // this is the default
            // we will simply open a basic socket and create the Input and
Ooutput Streams
            this.startNonNIO();
        } // end of if / else
    } // end of start()
    
    public void stop() throws JBIException {
        if (useNIO) {
            this.stopNIO();
        } else {
            this.stopNonNIO();
        } // ne do if / else
    } // end of stop()
    
    protected void sendToBus(String message) {
        log.debug("Inside sendToBus(), sending message: " + message);
        try {
            InOnly exchange =
this.getExchangeFactory().createInOnlyExchange();
            NormalizedMessage normalizedMessage = exchange.createMessage();
            exchange.setInMessage(normalizedMessage);
            this.marshaler.toNMS(normalizedMessage, message);
            this.getDeliveryChannel().sendSync(exchange);
            //done(exchange);
        } catch (MessagingException me) {
            throw new TcpConnectionException(me, message);
        } // end of try / catch
    } // end of sendToBus(String)
    
    /**
     * This is an empty implementation of the init().
     * This method exists so that the concrete class can override it
     * if there is something that needs to be done to initialize the
communication
     * with the server.
     * 
     * This method is called after the connection has been made and after
the 
     * Input and Output Streams have been assigned.
     * 
     * This method is called right before the processConnection().
     */
    protected void init() {
        // nothing is done here
        // if something needs to be done here in the concrete class
        // this method is called after the connection is made and the Input
and Output Streams
        // have been set but before the processConnection()
    } // end of init()
    
    /**
     * This is an empty implementation of the cleanup().
     * 
     * This method exists so the the concrete class can override it
     * if there is something that needs to be done to cleanup resources
before the 
     * connection is shutdown.
     * 
     * This method is called while the connection is still active and the
Input
     * and Output Streams are still available.
     */
    protected void cleanup() {
        // nothing is done here
        // if something needs to be done here in the concrete class
    } // end of init()
    
    private void startNIO() throws JBIException {
        if (log.isInfoEnabled()) {
            log.info("Using NIO to connect to " + this.host + ":" +
this.port);
        } // en dof if

        if (!this.running) {
            this.connectNIO();
            try {
                this.init();
                this.processConnection();
            } catch (SocketException e) {
                this.running = false;
                this.startNIO();
            } catch (IOException ioe) {
                throw new JBIException(ioe);
            } // end of try / catch
        } // end of if
    } // end of startNIO()
    
    private void connectNIO() throws JBIException {
        try {
            this.socketChannel = SocketChannel.open();
            this.socketChannel.configureBlocking(false);
            
            this.socketChannel.connect(new InetSocketAddress(this.host,
this.port));
            int count = 0;
            while (!this.socketChannel.finishConnect()) {
                log.debug("Finishing connection: " + (count++) + " times");
            } // end of while loop
            
            this.running = true;
        } catch (ConnectException ce) {
            log.warn(this.host + ":" + this.port + " unavailable, waiting "
+ this.retryInterval + " milliseconds before trying again.");
            this.running = false;
            try {
                Thread.sleep(this.retryInterval);
            } catch (InterruptedException e) {}
            this.connectNIO();
        } catch (Exception e) {
            log.error("Exception while trying to make a tcp connection to "
+ this.host + ":" + this.port);
            log.error(e);
            this.running = false;
            throw new JBIException("Failed to connect to " + this.host + ":"
+ this.port, e);
        } // end of try / catch
    } // end of connectNIO()
    
    private void startNonNIO() throws JBIException {
        if (log.isInfoEnabled()) {
            log.info("Using basic sockets to connect to " + this.host + ":"
+ this.port);
        } // en dof if
        
        if (!this.running) {
            this.connectNonNIO();
            try {
                this.init();
                this.processConnection();
            } catch (SocketException e) {
                this.running = false;
                this.startNonNIO();
            } catch (IOException ioe) {
                throw new JBIException(ioe);
            } // end of try / catch
        } // end of if
    } // end of startNonNIO()
    
    private void connectNonNIO() throws JBIException {
        try {
            this.socket = new Socket(this.host, this.port);
            
            this.in = this.socket.getInputStream();
            this.out = this.socket.getOutputStream();
            
            this.running = true;
        } catch (ConnectException ce) {
            log.warn(this.host + ":" + this.port + " unavailable, waiting "
+ this.retryInterval + " milliseconds before trying again.");
            this.running = false;
            try {
                Thread.sleep(this.retryInterval);
            } catch (InterruptedException e) {}
            this.connectNonNIO();
        } catch (Exception e) {
            log.error("Exception while trying to make a tcp connection to "
+ this.host + ":" + this.port);
            log.error(e);
            this.running = false;
            throw new JBIException("Failed to connect to " + this.host + ":"
+ this.port, e);
        } // end of try / catch
    } // end of connectNonNIO()
    
    private void stopNIO() throws JBIException {
        log.debug("Inside stopNIO()");
        if (this.socketChannel != null) {
            this.cleanup();
            try {
                this.socketChannel.close();
                this.socketChannel = null;
                this.running = false;
            } catch (IOException e) {
                throw new JBIException("Unable to close connection", e);
            } // end of try / catch
        } // end of if
    } // end of stopNIO()
    
    private void stopNonNIO() throws JBIException {
        log.debug("Inside stopNonNIO()");
        if (this.socket != null) {
            if (this.socket.isConnected()) {
                this.cleanup();
                try {
                    this.socket.close();
                    this.in = null;
                    this.out = null;
                    this.socket = null;
                    this.running = false;
                } catch (IOException e) {
                    throw new JBIException("Unable to close connection", e);
                } // end of try / catch
            } // end of if
        } // end of if
    } // end of stopNonNIO()
} // end of class TcpComponentSupport



On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:

> Sorry, let me try it again with just the .java file.
> 
> 
> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
> 
>> Hopefully this time the attachment will go through.  I am not exactly sure
>> why it didn't before.  It is a zip file.
>> 
>> I guess I was thinking that when the component is deployed into the
>> lwcontainer that it would be running in its own thread anyway.  Apparently I
>> was incorrect in my assumption.
>> 
>> I am still trying to get a grasp on the whole ServiceMix thing so please
>> excuse my ignorance.
>> 
>> Thanks,
>> Doug
>> 
>> 
>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>> 
>>> I think the attachment have been lost.
>>> 
>>> Btw, for a tcp consumer that creates a server socket, it has to wait
>>> for incoming connections, so how can you not start a thread ? The
>>> jabber api uses a listener, so it is really different.
>>> 
>>> Cheers,
>>> Guillaume Nodet
>>> 
>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>> I have attached my TcpComponentSupport class to this email.  If you could
>>>> be
>>>> so kind to take a look at it and let me know if it looks ok.  You mentioned
>>>> that the start method should start a Thread and then kill that Thread in
>>>> the
>>>> stop method.  I am not actually doing this right now however I of course
>>>> could implement it that way if necessary.  I attempted to use the
>>>> JabberComponentSupport class as a template.
>>>> 
>>>> Please let me know your thoughts.
>>>> 
>>>> Thanks,
>>>> Doug
>>>> 
>>>> 
>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>> 
>>>>> Not sure to understand the problem.
>>>>> Usually a lightweight consumer BC will begin to receive messages when
>>>>> the component is started.  It means that all the necessary
>>>>> configuration parameters have been set.
>>>>> The tcp component needs to start a thread inside the start method (and
>>>>> kill it inside the stop method).
>>>>> 
>>>>> The second problem is related to components / service units lifecycle.
>>>>> When a lightweight component is deployed to the lwcontainer, he has a
>>>>> specific lifecycle: when the service unit is started, the component is
>>>>> started and when the service unit is stopped, the component is
>>>>> shutdown.
>>>>> 
>>>>> But the problem should only occur when someone sends something on the
>>>>> socket
>>>>> ...
>>>>> 
>>>>> Cheers,
>>>>> Guillaume Nodet
>>>>> 
>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>> Guillaume,
>>>>>> 
>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an forgot
>>>>>> all
>>>>>> about it).  Anyway, thank you for the reply though.
>>>>>> 
>>>>>> I have been able to implement a TcpComponentSupport class, at least I
>>>>>> think
>>>>>> so anyway.  The problem that I am having now is that as soon as I deploy
>>>>>> the
>>>>>> component it starts collecting data and pushing it to the bus.  I have
>>>>>> run
>>>>>> into a couple of problems because of this.  The first problem is while
>>>>>> testing, I would like to extend from TestSupport and use the
>>>>>> assertMessagesReceived(), however when it loads the ApplicationContext,
>>>>>> the
>>>>>> concrete TcpComponentSupport class simply begins communicating over the
>>>>>> socket and the test method with the assertMessagesReceived() call is
>>>>>> never
>>>>>> executed.  The test will just run forever.
>>>>>> 
>>>>>> The second question that I have is along the same lines.  I would like to
>>>>>> create a service unit, with simply this component in it and deploy it to
>>>>>> ServiceMix and then use something like eip to route the messages where I
>>>>>> want them.  The same kind of problem occurs in this scenario, as soon as
>>>>>> the
>>>>>> service unit is deployed, it will start collecting data from the stream
>>>>>> and
>>>>>> trying to push it to the bus, the problem is that there is no destination
>>>>>> assigned to the the component yet so it tries to send it to "service:
>>>>>> null
>>>>>> and interface null".
>>>>>> 
>>>>>> Do you have any ideas?
>>>>>> 
>>>>>> Thank you,
>>>>>> Doug
>>>>>> 
>>>>>> 
>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>> 
>>>>>>> It seems there's none -- if I recall, you have already asked this
>>>>>>> question
>>>>>>> ;)
>>>>>>> However, it should not be very difficult to implement.
>>>>>>> You have to inherit the
>>>>>>> org.apache.servicemix.components.util.ComponentSupport and implements
>>>>>>> MessageExchangeListener.  Then, just create your socket and read it :)
>>>>>>> You may need another lw component for sending over tcp, also.
>>>>>>> 
>>>>>>> Cheers,
>>>>>>> Guillaume Nodet
>>>>>>> 
>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>> Does anyone know if a basic TCP component is available?  What I would
>>>>>>>> like
>>>>>>>> to be able to do is simply extend from a TCP component, set the host
>>>>>>>> and
>>>>>>>> port in the servicemix.xml file, and have access to the input and
>>>>>>>> output
>>>>>>>> streams in my implementation class.  I would then want to be able to
>>>>>>>> read
>>>>>>>> from the input stream and create a NormalizedMessage to put on the bus
>>>>>>>> and
>>>>>>>> also be able to get a message from the bus and write it to the output
>>>>>>>> stream.
>>>>>>>> 
>>>>>>>> Is there anything like this out there?
>>>>>>>> 
>>>>>>>> Thanks,
>>>>>>>> Doug
>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>>> 
>>>>>> 
>>>>> 
>>>> 
>>>> 
>>>> 
>>> 
>> 
> 


Reply via email to