Has a standard jbi BC component been created for this yet?  I think not, but
was curious.

-jeff


dfischer wrote:
> 
> Forget it, here is the code.  So sorry for so many emails.
> 
> 
> package org.apache.servicemix.components.tcp;
> 
> import org.apache.log4j.Logger;
> 
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.net.ConnectException;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.net.SocketException;
> import java.nio.channels.SocketChannel;
> 
> import javax.jbi.JBIException;
> import javax.jbi.messaging.InOnly;
> import javax.jbi.messaging.MessageExchange;
> import javax.jbi.messaging.MessagingException;
> import javax.jbi.messaging.NormalizedMessage;
> import javax.xml.transform.TransformerException;
> 
> import org.apache.servicemix.MessageExchangeListener;
> import org.apache.servicemix.components.util.ComponentSupport;
> import org.springframework.beans.factory.InitializingBean;
> 
> public abstract class TcpComponentSupport extends ComponentSupport
> implements
>         MessageExchangeListener, InitializingBean {
>     /**
>      * Logger for this class
>      */
>     private static final Logger log = Logger
>             .getLogger(TcpComponentSupport.class);
>     
>     private String host;
>     private int port = -1;
>     private boolean useNIO = false;
>     private int retryInterval = 10000;
>     protected SocketChannel socketChannel = null;
>     private Socket socket = null;
>     protected InputStream in = null;
>     protected OutputStream out = null;
>     protected boolean running = false;
>     private TcpMarshaler marshaler = new TcpMarshaler();
>     
>     protected abstract void processConnection() throws JBIException,
> IOException;
>     protected abstract void processIncomingMessage(String tcpMessage);
>     
>     public void setHost(String host) {
>         this.host = host;
>     } // en dof setHost(String)(
>     
>     public void setPort(int port) {
>         this.port = port;
>     } // en dof setPort(int)
>     
>     public void setUseNIO(boolean useNIO) {
>         this.useNIO = useNIO;
>     } // end of setUserNIO(boolean)
>     
>     public void setRetryInterval(int retryInterval) {
>         this.retryInterval = retryInterval;
>     } // end of setRetryInterval(int)
>     
>     protected boolean getUseNIO() {
>         return this.useNIO;
>     } // end of getUseNIO()
>     
>     protected String getHost() {
>         return this.host;
>     } // en dof getHost()
>     
>     protected int getPort() {
>         return this.port;
>     } // en dof getPort()
> 
>     public void onMessageExchange(MessageExchange exchange)
>             throws MessagingException {
>         String tcpMessage = null;
>         try {
>             tcpMessage =
> this.marshaler.fromNMS(exchange.getMessage("in"));
>             this.processIncomingMessage(tcpMessage);
>             this.done(exchange);
>         } catch (TransformerException e) {
>             throw new MessagingException(e);
>         } // end of try /catch
>     } // end of onMessageExchange(MessageExchange)
> 
>     public void afterPropertiesSet() throws Exception {
>         if (host == null) {
>             throw new IllegalArgumentException("You must specify the host
> property.");
>         } // end of if
>         
>         if (this.port < 0) {
>             throw new IllegalArgumentException("You must specify the port
> property.");
>         } // end of if
>     } // end of afterPropertiesSet()
>     
>     public void start() throws JBIException {
>         if (this.useNIO) {
>             // configure the NIO SocketChannel
>             this.startNIO();
>         } else {
>             // this is the default
>             // we will simply open a basic socket and create the Input and
> Ooutput Streams
>             this.startNonNIO();
>         } // end of if / else
>     } // end of start()
>     
>     public void stop() throws JBIException {
>         if (useNIO) {
>             this.stopNIO();
>         } else {
>             this.stopNonNIO();
>         } // ne do if / else
>     } // end of stop()
>     
>     protected void sendToBus(String message) {
>         log.debug("Inside sendToBus(), sending message: " + message);
>         try {
>             InOnly exchange =
> this.getExchangeFactory().createInOnlyExchange();
>             NormalizedMessage normalizedMessage =
> exchange.createMessage();
>             exchange.setInMessage(normalizedMessage);
>             this.marshaler.toNMS(normalizedMessage, message);
>             this.getDeliveryChannel().sendSync(exchange);
>             //done(exchange);
>         } catch (MessagingException me) {
>             throw new TcpConnectionException(me, message);
>         } // end of try / catch
>     } // end of sendToBus(String)
>     
>     /**
>      * This is an empty implementation of the init().
>      * This method exists so that the concrete class can override it
>      * if there is something that needs to be done to initialize the
> communication
>      * with the server.
>      * 
>      * This method is called after the connection has been made and after
> the 
>      * Input and Output Streams have been assigned.
>      * 
>      * This method is called right before the processConnection().
>      */
>     protected void init() {
>         // nothing is done here
>         // if something needs to be done here in the concrete class
>         // this method is called after the connection is made and the
> Input
> and Output Streams
>         // have been set but before the processConnection()
>     } // end of init()
>     
>     /**
>      * This is an empty implementation of the cleanup().
>      * 
>      * This method exists so the the concrete class can override it
>      * if there is something that needs to be done to cleanup resources
> before the 
>      * connection is shutdown.
>      * 
>      * This method is called while the connection is still active and the
> Input
>      * and Output Streams are still available.
>      */
>     protected void cleanup() {
>         // nothing is done here
>         // if something needs to be done here in the concrete class
>     } // end of init()
>     
>     private void startNIO() throws JBIException {
>         if (log.isInfoEnabled()) {
>             log.info("Using NIO to connect to " + this.host + ":" +
> this.port);
>         } // en dof if
> 
>         if (!this.running) {
>             this.connectNIO();
>             try {
>                 this.init();
>                 this.processConnection();
>             } catch (SocketException e) {
>                 this.running = false;
>                 this.startNIO();
>             } catch (IOException ioe) {
>                 throw new JBIException(ioe);
>             } // end of try / catch
>         } // end of if
>     } // end of startNIO()
>     
>     private void connectNIO() throws JBIException {
>         try {
>             this.socketChannel = SocketChannel.open();
>             this.socketChannel.configureBlocking(false);
>             
>             this.socketChannel.connect(new InetSocketAddress(this.host,
> this.port));
>             int count = 0;
>             while (!this.socketChannel.finishConnect()) {
>                 log.debug("Finishing connection: " + (count++) + "
> times");
>             } // end of while loop
>             
>             this.running = true;
>         } catch (ConnectException ce) {
>             log.warn(this.host + ":" + this.port + " unavailable, waiting
> "
> + this.retryInterval + " milliseconds before trying again.");
>             this.running = false;
>             try {
>                 Thread.sleep(this.retryInterval);
>             } catch (InterruptedException e) {}
>             this.connectNIO();
>         } catch (Exception e) {
>             log.error("Exception while trying to make a tcp connection to
> "
> + this.host + ":" + this.port);
>             log.error(e);
>             this.running = false;
>             throw new JBIException("Failed to connect to " + this.host +
> ":"
> + this.port, e);
>         } // end of try / catch
>     } // end of connectNIO()
>     
>     private void startNonNIO() throws JBIException {
>         if (log.isInfoEnabled()) {
>             log.info("Using basic sockets to connect to " + this.host +
> ":"
> + this.port);
>         } // en dof if
>         
>         if (!this.running) {
>             this.connectNonNIO();
>             try {
>                 this.init();
>                 this.processConnection();
>             } catch (SocketException e) {
>                 this.running = false;
>                 this.startNonNIO();
>             } catch (IOException ioe) {
>                 throw new JBIException(ioe);
>             } // end of try / catch
>         } // end of if
>     } // end of startNonNIO()
>     
>     private void connectNonNIO() throws JBIException {
>         try {
>             this.socket = new Socket(this.host, this.port);
>             
>             this.in = this.socket.getInputStream();
>             this.out = this.socket.getOutputStream();
>             
>             this.running = true;
>         } catch (ConnectException ce) {
>             log.warn(this.host + ":" + this.port + " unavailable, waiting
> "
> + this.retryInterval + " milliseconds before trying again.");
>             this.running = false;
>             try {
>                 Thread.sleep(this.retryInterval);
>             } catch (InterruptedException e) {}
>             this.connectNonNIO();
>         } catch (Exception e) {
>             log.error("Exception while trying to make a tcp connection to
> "
> + this.host + ":" + this.port);
>             log.error(e);
>             this.running = false;
>             throw new JBIException("Failed to connect to " + this.host +
> ":"
> + this.port, e);
>         } // end of try / catch
>     } // end of connectNonNIO()
>     
>     private void stopNIO() throws JBIException {
>         log.debug("Inside stopNIO()");
>         if (this.socketChannel != null) {
>             this.cleanup();
>             try {
>                 this.socketChannel.close();
>                 this.socketChannel = null;
>                 this.running = false;
>             } catch (IOException e) {
>                 throw new JBIException("Unable to close connection", e);
>             } // end of try / catch
>         } // end of if
>     } // end of stopNIO()
>     
>     private void stopNonNIO() throws JBIException {
>         log.debug("Inside stopNonNIO()");
>         if (this.socket != null) {
>             if (this.socket.isConnected()) {
>                 this.cleanup();
>                 try {
>                     this.socket.close();
>                     this.in = null;
>                     this.out = null;
>                     this.socket = null;
>                     this.running = false;
>                 } catch (IOException e) {
>                     throw new JBIException("Unable to close connection",
> e);
>                 } // end of try / catch
>             } // end of if
>         } // end of if
>     } // end of stopNonNIO()
> } // end of class TcpComponentSupport
> 
> 
> 
> On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
> 
>> Sorry, let me try it again with just the .java file.
>> 
>> 
>> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>> 
>>> Hopefully this time the attachment will go through.  I am not exactly
>>> sure
>>> why it didn't before.  It is a zip file.
>>> 
>>> I guess I was thinking that when the component is deployed into the
>>> lwcontainer that it would be running in its own thread anyway. 
>>> Apparently I
>>> was incorrect in my assumption.
>>> 
>>> I am still trying to get a grasp on the whole ServiceMix thing so please
>>> excuse my ignorance.
>>> 
>>> Thanks,
>>> Doug
>>> 
>>> 
>>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>> 
>>>> I think the attachment have been lost.
>>>> 
>>>> Btw, for a tcp consumer that creates a server socket, it has to wait
>>>> for incoming connections, so how can you not start a thread ? The
>>>> jabber api uses a listener, so it is really different.
>>>> 
>>>> Cheers,
>>>> Guillaume Nodet
>>>> 
>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>> I have attached my TcpComponentSupport class to this email.  If you
>>>>> could
>>>>> be
>>>>> so kind to take a look at it and let me know if it looks ok.  You
>>>>> mentioned
>>>>> that the start method should start a Thread and then kill that Thread
>>>>> in
>>>>> the
>>>>> stop method.  I am not actually doing this right now however I of
>>>>> course
>>>>> could implement it that way if necessary.  I attempted to use the
>>>>> JabberComponentSupport class as a template.
>>>>> 
>>>>> Please let me know your thoughts.
>>>>> 
>>>>> Thanks,
>>>>> Doug
>>>>> 
>>>>> 
>>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>> 
>>>>>> Not sure to understand the problem.
>>>>>> Usually a lightweight consumer BC will begin to receive messages when
>>>>>> the component is started.  It means that all the necessary
>>>>>> configuration parameters have been set.
>>>>>> The tcp component needs to start a thread inside the start method
>>>>>> (and
>>>>>> kill it inside the stop method).
>>>>>> 
>>>>>> The second problem is related to components / service units
>>>>>> lifecycle.
>>>>>> When a lightweight component is deployed to the lwcontainer, he has a
>>>>>> specific lifecycle: when the service unit is started, the component
>>>>>> is
>>>>>> started and when the service unit is stopped, the component is
>>>>>> shutdown.
>>>>>> 
>>>>>> But the problem should only occur when someone sends something on the
>>>>>> socket
>>>>>> ...
>>>>>> 
>>>>>> Cheers,
>>>>>> Guillaume Nodet
>>>>>> 
>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>> Guillaume,
>>>>>>> 
>>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an
>>>>>>> forgot
>>>>>>> all
>>>>>>> about it).  Anyway, thank you for the reply though.
>>>>>>> 
>>>>>>> I have been able to implement a TcpComponentSupport class, at least
>>>>>>> I
>>>>>>> think
>>>>>>> so anyway.  The problem that I am having now is that as soon as I
>>>>>>> deploy
>>>>>>> the
>>>>>>> component it starts collecting data and pushing it to the bus.  I
>>>>>>> have
>>>>>>> run
>>>>>>> into a couple of problems because of this.  The first problem is
>>>>>>> while
>>>>>>> testing, I would like to extend from TestSupport and use the
>>>>>>> assertMessagesReceived(), however when it loads the
>>>>>>> ApplicationContext,
>>>>>>> the
>>>>>>> concrete TcpComponentSupport class simply begins communicating over
>>>>>>> the
>>>>>>> socket and the test method with the assertMessagesReceived() call is
>>>>>>> never
>>>>>>> executed.  The test will just run forever.
>>>>>>> 
>>>>>>> The second question that I have is along the same lines.  I would
>>>>>>> like to
>>>>>>> create a service unit, with simply this component in it and deploy
>>>>>>> it to
>>>>>>> ServiceMix and then use something like eip to route the messages
>>>>>>> where I
>>>>>>> want them.  The same kind of problem occurs in this scenario, as
>>>>>>> soon as
>>>>>>> the
>>>>>>> service unit is deployed, it will start collecting data from the
>>>>>>> stream
>>>>>>> and
>>>>>>> trying to push it to the bus, the problem is that there is no
>>>>>>> destination
>>>>>>> assigned to the the component yet so it tries to send it to
>>>>>>> "service:
>>>>>>> null
>>>>>>> and interface null".
>>>>>>> 
>>>>>>> Do you have any ideas?
>>>>>>> 
>>>>>>> Thank you,
>>>>>>> Doug
>>>>>>> 
>>>>>>> 
>>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>>> 
>>>>>>>> It seems there's none -- if I recall, you have already asked this
>>>>>>>> question
>>>>>>>> ;)
>>>>>>>> However, it should not be very difficult to implement.
>>>>>>>> You have to inherit the
>>>>>>>> org.apache.servicemix.components.util.ComponentSupport and
>>>>>>>> implements
>>>>>>>> MessageExchangeListener.  Then, just create your socket and read it
>>>>>>>> :)
>>>>>>>> You may need another lw component for sending over tcp, also.
>>>>>>>> 
>>>>>>>> Cheers,
>>>>>>>> Guillaume Nodet
>>>>>>>> 
>>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>>> Does anyone know if a basic TCP component is available?  What I
>>>>>>>>> would
>>>>>>>>> like
>>>>>>>>> to be able to do is simply extend from a TCP component, set the
>>>>>>>>> host
>>>>>>>>> and
>>>>>>>>> port in the servicemix.xml file, and have access to the input and
>>>>>>>>> output
>>>>>>>>> streams in my implementation class.  I would then want to be able
>>>>>>>>> to
>>>>>>>>> read
>>>>>>>>> from the input stream and create a NormalizedMessage to put on the
>>>>>>>>> bus
>>>>>>>>> and
>>>>>>>>> also be able to get a message from the bus and write it to the
>>>>>>>>> output
>>>>>>>>> stream.
>>>>>>>>> 
>>>>>>>>> Is there anything like this out there?
>>>>>>>>> 
>>>>>>>>> Thanks,
>>>>>>>>> Doug
>>>>>>>>> 
>>>>>>>>> 
>>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>>> 
>>>>>> 
>>>>> 
>>>>> 
>>>>> 
>>>> 
>>> 
>> 
> 
> 
> 
> 

-- 
View this message in context: 
http://www.nabble.com/Basic-TCP-component-tf1595103s12049.html#a13099933
Sent from the ServiceMix - User mailing list archive at Nabble.com.

Reply via email to