Hmmm...  So maybe it makes more sense to re-use this in ServiceMix.  It would
be nice to have something that is bundled with servicemix that allows for
tcp/udp/etc protocols.


Andrea Zoppello-2 wrote:
> 
> In spagic we've one component to handle low level tcp protocol:
> 
> Take a look at it: http://forge.objectweb.org/projects/spagic
> 
> Andrea Zoppello
> 
> jpuro ha scritto:
>> Has a standard jbi BC component been created for this yet?  I think not,
>> but
>> was curious.
>>
>> -jeff
>>
>>
>> dfischer wrote:
>>   
>>> Forget it, here is the code.  So sorry for so many emails.
>>>
>>>
>>> package org.apache.servicemix.components.tcp;
>>>
>>> import org.apache.log4j.Logger;
>>>
>>> import java.io.IOException;
>>> import java.io.InputStream;
>>> import java.io.OutputStream;
>>> import java.net.ConnectException;
>>> import java.net.InetSocketAddress;
>>> import java.net.Socket;
>>> import java.net.SocketException;
>>> import java.nio.channels.SocketChannel;
>>>
>>> import javax.jbi.JBIException;
>>> import javax.jbi.messaging.InOnly;
>>> import javax.jbi.messaging.MessageExchange;
>>> import javax.jbi.messaging.MessagingException;
>>> import javax.jbi.messaging.NormalizedMessage;
>>> import javax.xml.transform.TransformerException;
>>>
>>> import org.apache.servicemix.MessageExchangeListener;
>>> import org.apache.servicemix.components.util.ComponentSupport;
>>> import org.springframework.beans.factory.InitializingBean;
>>>
>>> public abstract class TcpComponentSupport extends ComponentSupport
>>> implements
>>>         MessageExchangeListener, InitializingBean {
>>>     /**
>>>      * Logger for this class
>>>      */
>>>     private static final Logger log = Logger
>>>             .getLogger(TcpComponentSupport.class);
>>>     
>>>     private String host;
>>>     private int port = -1;
>>>     private boolean useNIO = false;
>>>     private int retryInterval = 10000;
>>>     protected SocketChannel socketChannel = null;
>>>     private Socket socket = null;
>>>     protected InputStream in = null;
>>>     protected OutputStream out = null;
>>>     protected boolean running = false;
>>>     private TcpMarshaler marshaler = new TcpMarshaler();
>>>     
>>>     protected abstract void processConnection() throws JBIException,
>>> IOException;
>>>     protected abstract void processIncomingMessage(String tcpMessage);
>>>     
>>>     public void setHost(String host) {
>>>         this.host = host;
>>>     } // en dof setHost(String)(
>>>     
>>>     public void setPort(int port) {
>>>         this.port = port;
>>>     } // en dof setPort(int)
>>>     
>>>     public void setUseNIO(boolean useNIO) {
>>>         this.useNIO = useNIO;
>>>     } // end of setUserNIO(boolean)
>>>     
>>>     public void setRetryInterval(int retryInterval) {
>>>         this.retryInterval = retryInterval;
>>>     } // end of setRetryInterval(int)
>>>     
>>>     protected boolean getUseNIO() {
>>>         return this.useNIO;
>>>     } // end of getUseNIO()
>>>     
>>>     protected String getHost() {
>>>         return this.host;
>>>     } // en dof getHost()
>>>     
>>>     protected int getPort() {
>>>         return this.port;
>>>     } // en dof getPort()
>>>
>>>     public void onMessageExchange(MessageExchange exchange)
>>>             throws MessagingException {
>>>         String tcpMessage = null;
>>>         try {
>>>             tcpMessage =
>>> this.marshaler.fromNMS(exchange.getMessage("in"));
>>>             this.processIncomingMessage(tcpMessage);
>>>             this.done(exchange);
>>>         } catch (TransformerException e) {
>>>             throw new MessagingException(e);
>>>         } // end of try /catch
>>>     } // end of onMessageExchange(MessageExchange)
>>>
>>>     public void afterPropertiesSet() throws Exception {
>>>         if (host == null) {
>>>             throw new IllegalArgumentException("You must specify the
>>> host
>>> property.");
>>>         } // end of if
>>>         
>>>         if (this.port < 0) {
>>>             throw new IllegalArgumentException("You must specify the
>>> port
>>> property.");
>>>         } // end of if
>>>     } // end of afterPropertiesSet()
>>>     
>>>     public void start() throws JBIException {
>>>         if (this.useNIO) {
>>>             // configure the NIO SocketChannel
>>>             this.startNIO();
>>>         } else {
>>>             // this is the default
>>>             // we will simply open a basic socket and create the Input
>>> and
>>> Ooutput Streams
>>>             this.startNonNIO();
>>>         } // end of if / else
>>>     } // end of start()
>>>     
>>>     public void stop() throws JBIException {
>>>         if (useNIO) {
>>>             this.stopNIO();
>>>         } else {
>>>             this.stopNonNIO();
>>>         } // ne do if / else
>>>     } // end of stop()
>>>     
>>>     protected void sendToBus(String message) {
>>>         log.debug("Inside sendToBus(), sending message: " + message);
>>>         try {
>>>             InOnly exchange =
>>> this.getExchangeFactory().createInOnlyExchange();
>>>             NormalizedMessage normalizedMessage =
>>> exchange.createMessage();
>>>             exchange.setInMessage(normalizedMessage);
>>>             this.marshaler.toNMS(normalizedMessage, message);
>>>             this.getDeliveryChannel().sendSync(exchange);
>>>             //done(exchange);
>>>         } catch (MessagingException me) {
>>>             throw new TcpConnectionException(me, message);
>>>         } // end of try / catch
>>>     } // end of sendToBus(String)
>>>     
>>>     /**
>>>      * This is an empty implementation of the init().
>>>      * This method exists so that the concrete class can override it
>>>      * if there is something that needs to be done to initialize the
>>> communication
>>>      * with the server.
>>>      * 
>>>      * This method is called after the connection has been made and
>>> after
>>> the 
>>>      * Input and Output Streams have been assigned.
>>>      * 
>>>      * This method is called right before the processConnection().
>>>      */
>>>     protected void init() {
>>>         // nothing is done here
>>>         // if something needs to be done here in the concrete class
>>>         // this method is called after the connection is made and the
>>> Input
>>> and Output Streams
>>>         // have been set but before the processConnection()
>>>     } // end of init()
>>>     
>>>     /**
>>>      * This is an empty implementation of the cleanup().
>>>      * 
>>>      * This method exists so the the concrete class can override it
>>>      * if there is something that needs to be done to cleanup resources
>>> before the 
>>>      * connection is shutdown.
>>>      * 
>>>      * This method is called while the connection is still active and
>>> the
>>> Input
>>>      * and Output Streams are still available.
>>>      */
>>>     protected void cleanup() {
>>>         // nothing is done here
>>>         // if something needs to be done here in the concrete class
>>>     } // end of init()
>>>     
>>>     private void startNIO() throws JBIException {
>>>         if (log.isInfoEnabled()) {
>>>             log.info("Using NIO to connect to " + this.host + ":" +
>>> this.port);
>>>         } // en dof if
>>>
>>>         if (!this.running) {
>>>             this.connectNIO();
>>>             try {
>>>                 this.init();
>>>                 this.processConnection();
>>>             } catch (SocketException e) {
>>>                 this.running = false;
>>>                 this.startNIO();
>>>             } catch (IOException ioe) {
>>>                 throw new JBIException(ioe);
>>>             } // end of try / catch
>>>         } // end of if
>>>     } // end of startNIO()
>>>     
>>>     private void connectNIO() throws JBIException {
>>>         try {
>>>             this.socketChannel = SocketChannel.open();
>>>             this.socketChannel.configureBlocking(false);
>>>             
>>>             this.socketChannel.connect(new InetSocketAddress(this.host,
>>> this.port));
>>>             int count = 0;
>>>             while (!this.socketChannel.finishConnect()) {
>>>                 log.debug("Finishing connection: " + (count++) + "
>>> times");
>>>             } // end of while loop
>>>             
>>>             this.running = true;
>>>         } catch (ConnectException ce) {
>>>             log.warn(this.host + ":" + this.port + " unavailable,
>>> waiting
>>> "
>>> + this.retryInterval + " milliseconds before trying again.");
>>>             this.running = false;
>>>             try {
>>>                 Thread.sleep(this.retryInterval);
>>>             } catch (InterruptedException e) {}
>>>             this.connectNIO();
>>>         } catch (Exception e) {
>>>             log.error("Exception while trying to make a tcp connection
>>> to
>>> "
>>> + this.host + ":" + this.port);
>>>             log.error(e);
>>>             this.running = false;
>>>             throw new JBIException("Failed to connect to " + this.host +
>>> ":"
>>> + this.port, e);
>>>         } // end of try / catch
>>>     } // end of connectNIO()
>>>     
>>>     private void startNonNIO() throws JBIException {
>>>         if (log.isInfoEnabled()) {
>>>             log.info("Using basic sockets to connect to " + this.host +
>>> ":"
>>> + this.port);
>>>         } // en dof if
>>>         
>>>         if (!this.running) {
>>>             this.connectNonNIO();
>>>             try {
>>>                 this.init();
>>>                 this.processConnection();
>>>             } catch (SocketException e) {
>>>                 this.running = false;
>>>                 this.startNonNIO();
>>>             } catch (IOException ioe) {
>>>                 throw new JBIException(ioe);
>>>             } // end of try / catch
>>>         } // end of if
>>>     } // end of startNonNIO()
>>>     
>>>     private void connectNonNIO() throws JBIException {
>>>         try {
>>>             this.socket = new Socket(this.host, this.port);
>>>             
>>>             this.in = this.socket.getInputStream();
>>>             this.out = this.socket.getOutputStream();
>>>             
>>>             this.running = true;
>>>         } catch (ConnectException ce) {
>>>             log.warn(this.host + ":" + this.port + " unavailable,
>>> waiting
>>> "
>>> + this.retryInterval + " milliseconds before trying again.");
>>>             this.running = false;
>>>             try {
>>>                 Thread.sleep(this.retryInterval);
>>>             } catch (InterruptedException e) {}
>>>             this.connectNonNIO();
>>>         } catch (Exception e) {
>>>             log.error("Exception while trying to make a tcp connection
>>> to
>>> "
>>> + this.host + ":" + this.port);
>>>             log.error(e);
>>>             this.running = false;
>>>             throw new JBIException("Failed to connect to " + this.host +
>>> ":"
>>> + this.port, e);
>>>         } // end of try / catch
>>>     } // end of connectNonNIO()
>>>     
>>>     private void stopNIO() throws JBIException {
>>>         log.debug("Inside stopNIO()");
>>>         if (this.socketChannel != null) {
>>>             this.cleanup();
>>>             try {
>>>                 this.socketChannel.close();
>>>                 this.socketChannel = null;
>>>                 this.running = false;
>>>             } catch (IOException e) {
>>>                 throw new JBIException("Unable to close connection", e);
>>>             } // end of try / catch
>>>         } // end of if
>>>     } // end of stopNIO()
>>>     
>>>     private void stopNonNIO() throws JBIException {
>>>         log.debug("Inside stopNonNIO()");
>>>         if (this.socket != null) {
>>>             if (this.socket.isConnected()) {
>>>                 this.cleanup();
>>>                 try {
>>>                     this.socket.close();
>>>                     this.in = null;
>>>                     this.out = null;
>>>                     this.socket = null;
>>>                     this.running = false;
>>>                 } catch (IOException e) {
>>>                     throw new JBIException("Unable to close connection",
>>> e);
>>>                 } // end of try / catch
>>>             } // end of if
>>>         } // end of if
>>>     } // end of stopNonNIO()
>>> } // end of class TcpComponentSupport
>>>
>>>
>>>
>>> On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>>>
>>>     
>>>> Sorry, let me try it again with just the .java file.
>>>>
>>>>
>>>> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>>>>
>>>>       
>>>>> Hopefully this time the attachment will go through.  I am not exactly
>>>>> sure
>>>>> why it didn't before.  It is a zip file.
>>>>>
>>>>> I guess I was thinking that when the component is deployed into the
>>>>> lwcontainer that it would be running in its own thread anyway. 
>>>>> Apparently I
>>>>> was incorrect in my assumption.
>>>>>
>>>>> I am still trying to get a grasp on the whole ServiceMix thing so
>>>>> please
>>>>> excuse my ignorance.
>>>>>
>>>>> Thanks,
>>>>> Doug
>>>>>
>>>>>
>>>>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>
>>>>>         
>>>>>> I think the attachment have been lost.
>>>>>>
>>>>>> Btw, for a tcp consumer that creates a server socket, it has to wait
>>>>>> for incoming connections, so how can you not start a thread ? The
>>>>>> jabber api uses a listener, so it is really different.
>>>>>>
>>>>>> Cheers,
>>>>>> Guillaume Nodet
>>>>>>
>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>           
>>>>>>> I have attached my TcpComponentSupport class to this email.  If you
>>>>>>> could
>>>>>>> be
>>>>>>> so kind to take a look at it and let me know if it looks ok.  You
>>>>>>> mentioned
>>>>>>> that the start method should start a Thread and then kill that
>>>>>>> Thread
>>>>>>> in
>>>>>>> the
>>>>>>> stop method.  I am not actually doing this right now however I of
>>>>>>> course
>>>>>>> could implement it that way if necessary.  I attempted to use the
>>>>>>> JabberComponentSupport class as a template.
>>>>>>>
>>>>>>> Please let me know your thoughts.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Doug
>>>>>>>
>>>>>>>
>>>>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>>>
>>>>>>>             
>>>>>>>> Not sure to understand the problem.
>>>>>>>> Usually a lightweight consumer BC will begin to receive messages
>>>>>>>> when
>>>>>>>> the component is started.  It means that all the necessary
>>>>>>>> configuration parameters have been set.
>>>>>>>> The tcp component needs to start a thread inside the start method
>>>>>>>> (and
>>>>>>>> kill it inside the stop method).
>>>>>>>>
>>>>>>>> The second problem is related to components / service units
>>>>>>>> lifecycle.
>>>>>>>> When a lightweight component is deployed to the lwcontainer, he has
>>>>>>>> a
>>>>>>>> specific lifecycle: when the service unit is started, the component
>>>>>>>> is
>>>>>>>> started and when the service unit is stopped, the component is
>>>>>>>> shutdown.
>>>>>>>>
>>>>>>>> But the problem should only occur when someone sends something on
>>>>>>>> the
>>>>>>>> socket
>>>>>>>> ...
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Guillaume Nodet
>>>>>>>>
>>>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>>               
>>>>>>>>> Guillaume,
>>>>>>>>>
>>>>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an
>>>>>>>>> forgot
>>>>>>>>> all
>>>>>>>>> about it).  Anyway, thank you for the reply though.
>>>>>>>>>
>>>>>>>>> I have been able to implement a TcpComponentSupport class, at
>>>>>>>>> least
>>>>>>>>> I
>>>>>>>>> think
>>>>>>>>> so anyway.  The problem that I am having now is that as soon as I
>>>>>>>>> deploy
>>>>>>>>> the
>>>>>>>>> component it starts collecting data and pushing it to the bus.  I
>>>>>>>>> have
>>>>>>>>> run
>>>>>>>>> into a couple of problems because of this.  The first problem is
>>>>>>>>> while
>>>>>>>>> testing, I would like to extend from TestSupport and use the
>>>>>>>>> assertMessagesReceived(), however when it loads the
>>>>>>>>> ApplicationContext,
>>>>>>>>> the
>>>>>>>>> concrete TcpComponentSupport class simply begins communicating
>>>>>>>>> over
>>>>>>>>> the
>>>>>>>>> socket and the test method with the assertMessagesReceived() call
>>>>>>>>> is
>>>>>>>>> never
>>>>>>>>> executed.  The test will just run forever.
>>>>>>>>>
>>>>>>>>> The second question that I have is along the same lines.  I would
>>>>>>>>> like to
>>>>>>>>> create a service unit, with simply this component in it and deploy
>>>>>>>>> it to
>>>>>>>>> ServiceMix and then use something like eip to route the messages
>>>>>>>>> where I
>>>>>>>>> want them.  The same kind of problem occurs in this scenario, as
>>>>>>>>> soon as
>>>>>>>>> the
>>>>>>>>> service unit is deployed, it will start collecting data from the
>>>>>>>>> stream
>>>>>>>>> and
>>>>>>>>> trying to push it to the bus, the problem is that there is no
>>>>>>>>> destination
>>>>>>>>> assigned to the the component yet so it tries to send it to
>>>>>>>>> "service:
>>>>>>>>> null
>>>>>>>>> and interface null".
>>>>>>>>>
>>>>>>>>> Do you have any ideas?
>>>>>>>>>
>>>>>>>>> Thank you,
>>>>>>>>> Doug
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>>>>>
>>>>>>>>>                 
>>>>>>>>>> It seems there's none -- if I recall, you have already asked this
>>>>>>>>>> question
>>>>>>>>>> ;)
>>>>>>>>>> However, it should not be very difficult to implement.
>>>>>>>>>> You have to inherit the
>>>>>>>>>> org.apache.servicemix.components.util.ComponentSupport and
>>>>>>>>>> implements
>>>>>>>>>> MessageExchangeListener.  Then, just create your socket and read
>>>>>>>>>> it
>>>>>>>>>> :)
>>>>>>>>>> You may need another lw component for sending over tcp, also.
>>>>>>>>>>
>>>>>>>>>> Cheers,
>>>>>>>>>> Guillaume Nodet
>>>>>>>>>>
>>>>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>>>>                   
>>>>>>>>>>> Does anyone know if a basic TCP component is available?  What I
>>>>>>>>>>> would
>>>>>>>>>>> like
>>>>>>>>>>> to be able to do is simply extend from a TCP component, set the
>>>>>>>>>>> host
>>>>>>>>>>> and
>>>>>>>>>>> port in the servicemix.xml file, and have access to the input
>>>>>>>>>>> and
>>>>>>>>>>> output
>>>>>>>>>>> streams in my implementation class.  I would then want to be
>>>>>>>>>>> able
>>>>>>>>>>> to
>>>>>>>>>>> read
>>>>>>>>>>> from the input stream and create a NormalizedMessage to put on
>>>>>>>>>>> the
>>>>>>>>>>> bus
>>>>>>>>>>> and
>>>>>>>>>>> also be able to get a message from the bus and write it to the
>>>>>>>>>>> output
>>>>>>>>>>> stream.
>>>>>>>>>>>
>>>>>>>>>>> Is there anything like this out there?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Doug
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>                     
>>>>>>>>>
>>>>>>>>>                 
>>>>>>>
>>>>>>>             
>>>
>>>
>>>     
>>
>>   
> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Basic-TCP-component-tf1595103s12049.html#a13105397
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to