Hmmm... So maybe it makes more sense to re-use this in ServiceMix. It would be nice to have something that is bundled with servicemix that allows for tcp/udp/etc protocols.
Andrea Zoppello-2 wrote: > > In spagic we've one component to handle low level tcp protocol: > > Take a look at it: http://forge.objectweb.org/projects/spagic > > Andrea Zoppello > > jpuro ha scritto: >> Has a standard jbi BC component been created for this yet? I think not, >> but >> was curious. >> >> -jeff >> >> >> dfischer wrote: >> >>> Forget it, here is the code. So sorry for so many emails. >>> >>> >>> package org.apache.servicemix.components.tcp; >>> >>> import org.apache.log4j.Logger; >>> >>> import java.io.IOException; >>> import java.io.InputStream; >>> import java.io.OutputStream; >>> import java.net.ConnectException; >>> import java.net.InetSocketAddress; >>> import java.net.Socket; >>> import java.net.SocketException; >>> import java.nio.channels.SocketChannel; >>> >>> import javax.jbi.JBIException; >>> import javax.jbi.messaging.InOnly; >>> import javax.jbi.messaging.MessageExchange; >>> import javax.jbi.messaging.MessagingException; >>> import javax.jbi.messaging.NormalizedMessage; >>> import javax.xml.transform.TransformerException; >>> >>> import org.apache.servicemix.MessageExchangeListener; >>> import org.apache.servicemix.components.util.ComponentSupport; >>> import org.springframework.beans.factory.InitializingBean; >>> >>> public abstract class TcpComponentSupport extends ComponentSupport >>> implements >>> MessageExchangeListener, InitializingBean { >>> /** >>> * Logger for this class >>> */ >>> private static final Logger log = Logger >>> .getLogger(TcpComponentSupport.class); >>> >>> private String host; >>> private int port = -1; >>> private boolean useNIO = false; >>> private int retryInterval = 10000; >>> protected SocketChannel socketChannel = null; >>> private Socket socket = null; >>> protected InputStream in = null; >>> protected OutputStream out = null; >>> protected boolean running = false; >>> private TcpMarshaler marshaler = new TcpMarshaler(); >>> >>> protected abstract void processConnection() throws JBIException, >>> IOException; >>> protected abstract void processIncomingMessage(String tcpMessage); >>> >>> public void setHost(String host) { >>> this.host = host; >>> } // en dof setHost(String)( >>> >>> public void setPort(int port) { >>> this.port = port; >>> } // en dof setPort(int) >>> >>> public void setUseNIO(boolean useNIO) { >>> this.useNIO = useNIO; >>> } // end of setUserNIO(boolean) >>> >>> public void setRetryInterval(int retryInterval) { >>> this.retryInterval = retryInterval; >>> } // end of setRetryInterval(int) >>> >>> protected boolean getUseNIO() { >>> return this.useNIO; >>> } // end of getUseNIO() >>> >>> protected String getHost() { >>> return this.host; >>> } // en dof getHost() >>> >>> protected int getPort() { >>> return this.port; >>> } // en dof getPort() >>> >>> public void onMessageExchange(MessageExchange exchange) >>> throws MessagingException { >>> String tcpMessage = null; >>> try { >>> tcpMessage = >>> this.marshaler.fromNMS(exchange.getMessage("in")); >>> this.processIncomingMessage(tcpMessage); >>> this.done(exchange); >>> } catch (TransformerException e) { >>> throw new MessagingException(e); >>> } // end of try /catch >>> } // end of onMessageExchange(MessageExchange) >>> >>> public void afterPropertiesSet() throws Exception { >>> if (host == null) { >>> throw new IllegalArgumentException("You must specify the >>> host >>> property."); >>> } // end of if >>> >>> if (this.port < 0) { >>> throw new IllegalArgumentException("You must specify the >>> port >>> property."); >>> } // end of if >>> } // end of afterPropertiesSet() >>> >>> public void start() throws JBIException { >>> if (this.useNIO) { >>> // configure the NIO SocketChannel >>> this.startNIO(); >>> } else { >>> // this is the default >>> // we will simply open a basic socket and create the Input >>> and >>> Ooutput Streams >>> this.startNonNIO(); >>> } // end of if / else >>> } // end of start() >>> >>> public void stop() throws JBIException { >>> if (useNIO) { >>> this.stopNIO(); >>> } else { >>> this.stopNonNIO(); >>> } // ne do if / else >>> } // end of stop() >>> >>> protected void sendToBus(String message) { >>> log.debug("Inside sendToBus(), sending message: " + message); >>> try { >>> InOnly exchange = >>> this.getExchangeFactory().createInOnlyExchange(); >>> NormalizedMessage normalizedMessage = >>> exchange.createMessage(); >>> exchange.setInMessage(normalizedMessage); >>> this.marshaler.toNMS(normalizedMessage, message); >>> this.getDeliveryChannel().sendSync(exchange); >>> //done(exchange); >>> } catch (MessagingException me) { >>> throw new TcpConnectionException(me, message); >>> } // end of try / catch >>> } // end of sendToBus(String) >>> >>> /** >>> * This is an empty implementation of the init(). >>> * This method exists so that the concrete class can override it >>> * if there is something that needs to be done to initialize the >>> communication >>> * with the server. >>> * >>> * This method is called after the connection has been made and >>> after >>> the >>> * Input and Output Streams have been assigned. >>> * >>> * This method is called right before the processConnection(). >>> */ >>> protected void init() { >>> // nothing is done here >>> // if something needs to be done here in the concrete class >>> // this method is called after the connection is made and the >>> Input >>> and Output Streams >>> // have been set but before the processConnection() >>> } // end of init() >>> >>> /** >>> * This is an empty implementation of the cleanup(). >>> * >>> * This method exists so the the concrete class can override it >>> * if there is something that needs to be done to cleanup resources >>> before the >>> * connection is shutdown. >>> * >>> * This method is called while the connection is still active and >>> the >>> Input >>> * and Output Streams are still available. >>> */ >>> protected void cleanup() { >>> // nothing is done here >>> // if something needs to be done here in the concrete class >>> } // end of init() >>> >>> private void startNIO() throws JBIException { >>> if (log.isInfoEnabled()) { >>> log.info("Using NIO to connect to " + this.host + ":" + >>> this.port); >>> } // en dof if >>> >>> if (!this.running) { >>> this.connectNIO(); >>> try { >>> this.init(); >>> this.processConnection(); >>> } catch (SocketException e) { >>> this.running = false; >>> this.startNIO(); >>> } catch (IOException ioe) { >>> throw new JBIException(ioe); >>> } // end of try / catch >>> } // end of if >>> } // end of startNIO() >>> >>> private void connectNIO() throws JBIException { >>> try { >>> this.socketChannel = SocketChannel.open(); >>> this.socketChannel.configureBlocking(false); >>> >>> this.socketChannel.connect(new InetSocketAddress(this.host, >>> this.port)); >>> int count = 0; >>> while (!this.socketChannel.finishConnect()) { >>> log.debug("Finishing connection: " + (count++) + " >>> times"); >>> } // end of while loop >>> >>> this.running = true; >>> } catch (ConnectException ce) { >>> log.warn(this.host + ":" + this.port + " unavailable, >>> waiting >>> " >>> + this.retryInterval + " milliseconds before trying again."); >>> this.running = false; >>> try { >>> Thread.sleep(this.retryInterval); >>> } catch (InterruptedException e) {} >>> this.connectNIO(); >>> } catch (Exception e) { >>> log.error("Exception while trying to make a tcp connection >>> to >>> " >>> + this.host + ":" + this.port); >>> log.error(e); >>> this.running = false; >>> throw new JBIException("Failed to connect to " + this.host + >>> ":" >>> + this.port, e); >>> } // end of try / catch >>> } // end of connectNIO() >>> >>> private void startNonNIO() throws JBIException { >>> if (log.isInfoEnabled()) { >>> log.info("Using basic sockets to connect to " + this.host + >>> ":" >>> + this.port); >>> } // en dof if >>> >>> if (!this.running) { >>> this.connectNonNIO(); >>> try { >>> this.init(); >>> this.processConnection(); >>> } catch (SocketException e) { >>> this.running = false; >>> this.startNonNIO(); >>> } catch (IOException ioe) { >>> throw new JBIException(ioe); >>> } // end of try / catch >>> } // end of if >>> } // end of startNonNIO() >>> >>> private void connectNonNIO() throws JBIException { >>> try { >>> this.socket = new Socket(this.host, this.port); >>> >>> this.in = this.socket.getInputStream(); >>> this.out = this.socket.getOutputStream(); >>> >>> this.running = true; >>> } catch (ConnectException ce) { >>> log.warn(this.host + ":" + this.port + " unavailable, >>> waiting >>> " >>> + this.retryInterval + " milliseconds before trying again."); >>> this.running = false; >>> try { >>> Thread.sleep(this.retryInterval); >>> } catch (InterruptedException e) {} >>> this.connectNonNIO(); >>> } catch (Exception e) { >>> log.error("Exception while trying to make a tcp connection >>> to >>> " >>> + this.host + ":" + this.port); >>> log.error(e); >>> this.running = false; >>> throw new JBIException("Failed to connect to " + this.host + >>> ":" >>> + this.port, e); >>> } // end of try / catch >>> } // end of connectNonNIO() >>> >>> private void stopNIO() throws JBIException { >>> log.debug("Inside stopNIO()"); >>> if (this.socketChannel != null) { >>> this.cleanup(); >>> try { >>> this.socketChannel.close(); >>> this.socketChannel = null; >>> this.running = false; >>> } catch (IOException e) { >>> throw new JBIException("Unable to close connection", e); >>> } // end of try / catch >>> } // end of if >>> } // end of stopNIO() >>> >>> private void stopNonNIO() throws JBIException { >>> log.debug("Inside stopNonNIO()"); >>> if (this.socket != null) { >>> if (this.socket.isConnected()) { >>> this.cleanup(); >>> try { >>> this.socket.close(); >>> this.in = null; >>> this.out = null; >>> this.socket = null; >>> this.running = false; >>> } catch (IOException e) { >>> throw new JBIException("Unable to close connection", >>> e); >>> } // end of try / catch >>> } // end of if >>> } // end of if >>> } // end of stopNonNIO() >>> } // end of class TcpComponentSupport >>> >>> >>> >>> On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote: >>> >>> >>>> Sorry, let me try it again with just the .java file. >>>> >>>> >>>> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote: >>>> >>>> >>>>> Hopefully this time the attachment will go through. I am not exactly >>>>> sure >>>>> why it didn't before. It is a zip file. >>>>> >>>>> I guess I was thinking that when the component is deployed into the >>>>> lwcontainer that it would be running in its own thread anyway. >>>>> Apparently I >>>>> was incorrect in my assumption. >>>>> >>>>> I am still trying to get a grasp on the whole ServiceMix thing so >>>>> please >>>>> excuse my ignorance. >>>>> >>>>> Thanks, >>>>> Doug >>>>> >>>>> >>>>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote: >>>>> >>>>> >>>>>> I think the attachment have been lost. >>>>>> >>>>>> Btw, for a tcp consumer that creates a server socket, it has to wait >>>>>> for incoming connections, so how can you not start a thread ? The >>>>>> jabber api uses a listener, so it is really different. >>>>>> >>>>>> Cheers, >>>>>> Guillaume Nodet >>>>>> >>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote: >>>>>> >>>>>>> I have attached my TcpComponentSupport class to this email. If you >>>>>>> could >>>>>>> be >>>>>>> so kind to take a look at it and let me know if it looks ok. You >>>>>>> mentioned >>>>>>> that the start method should start a Thread and then kill that >>>>>>> Thread >>>>>>> in >>>>>>> the >>>>>>> stop method. I am not actually doing this right now however I of >>>>>>> course >>>>>>> could implement it that way if necessary. I attempted to use the >>>>>>> JabberComponentSupport class as a template. >>>>>>> >>>>>>> Please let me know your thoughts. >>>>>>> >>>>>>> Thanks, >>>>>>> Doug >>>>>>> >>>>>>> >>>>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote: >>>>>>> >>>>>>> >>>>>>>> Not sure to understand the problem. >>>>>>>> Usually a lightweight consumer BC will begin to receive messages >>>>>>>> when >>>>>>>> the component is started. It means that all the necessary >>>>>>>> configuration parameters have been set. >>>>>>>> The tcp component needs to start a thread inside the start method >>>>>>>> (and >>>>>>>> kill it inside the stop method). >>>>>>>> >>>>>>>> The second problem is related to components / service units >>>>>>>> lifecycle. >>>>>>>> When a lightweight component is deployed to the lwcontainer, he has >>>>>>>> a >>>>>>>> specific lifecycle: when the service unit is started, the component >>>>>>>> is >>>>>>>> started and when the service unit is stopped, the component is >>>>>>>> shutdown. >>>>>>>> >>>>>>>> But the problem should only occur when someone sends something on >>>>>>>> the >>>>>>>> socket >>>>>>>> ... >>>>>>>> >>>>>>>> Cheers, >>>>>>>> Guillaume Nodet >>>>>>>> >>>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote: >>>>>>>> >>>>>>>>> Guillaume, >>>>>>>>> >>>>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an >>>>>>>>> forgot >>>>>>>>> all >>>>>>>>> about it). Anyway, thank you for the reply though. >>>>>>>>> >>>>>>>>> I have been able to implement a TcpComponentSupport class, at >>>>>>>>> least >>>>>>>>> I >>>>>>>>> think >>>>>>>>> so anyway. The problem that I am having now is that as soon as I >>>>>>>>> deploy >>>>>>>>> the >>>>>>>>> component it starts collecting data and pushing it to the bus. I >>>>>>>>> have >>>>>>>>> run >>>>>>>>> into a couple of problems because of this. The first problem is >>>>>>>>> while >>>>>>>>> testing, I would like to extend from TestSupport and use the >>>>>>>>> assertMessagesReceived(), however when it loads the >>>>>>>>> ApplicationContext, >>>>>>>>> the >>>>>>>>> concrete TcpComponentSupport class simply begins communicating >>>>>>>>> over >>>>>>>>> the >>>>>>>>> socket and the test method with the assertMessagesReceived() call >>>>>>>>> is >>>>>>>>> never >>>>>>>>> executed. The test will just run forever. >>>>>>>>> >>>>>>>>> The second question that I have is along the same lines. I would >>>>>>>>> like to >>>>>>>>> create a service unit, with simply this component in it and deploy >>>>>>>>> it to >>>>>>>>> ServiceMix and then use something like eip to route the messages >>>>>>>>> where I >>>>>>>>> want them. The same kind of problem occurs in this scenario, as >>>>>>>>> soon as >>>>>>>>> the >>>>>>>>> service unit is deployed, it will start collecting data from the >>>>>>>>> stream >>>>>>>>> and >>>>>>>>> trying to push it to the bus, the problem is that there is no >>>>>>>>> destination >>>>>>>>> assigned to the the component yet so it tries to send it to >>>>>>>>> "service: >>>>>>>>> null >>>>>>>>> and interface null". >>>>>>>>> >>>>>>>>> Do you have any ideas? >>>>>>>>> >>>>>>>>> Thank you, >>>>>>>>> Doug >>>>>>>>> >>>>>>>>> >>>>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote: >>>>>>>>> >>>>>>>>> >>>>>>>>>> It seems there's none -- if I recall, you have already asked this >>>>>>>>>> question >>>>>>>>>> ;) >>>>>>>>>> However, it should not be very difficult to implement. >>>>>>>>>> You have to inherit the >>>>>>>>>> org.apache.servicemix.components.util.ComponentSupport and >>>>>>>>>> implements >>>>>>>>>> MessageExchangeListener. Then, just create your socket and read >>>>>>>>>> it >>>>>>>>>> :) >>>>>>>>>> You may need another lw component for sending over tcp, also. >>>>>>>>>> >>>>>>>>>> Cheers, >>>>>>>>>> Guillaume Nodet >>>>>>>>>> >>>>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote: >>>>>>>>>> >>>>>>>>>>> Does anyone know if a basic TCP component is available? What I >>>>>>>>>>> would >>>>>>>>>>> like >>>>>>>>>>> to be able to do is simply extend from a TCP component, set the >>>>>>>>>>> host >>>>>>>>>>> and >>>>>>>>>>> port in the servicemix.xml file, and have access to the input >>>>>>>>>>> and >>>>>>>>>>> output >>>>>>>>>>> streams in my implementation class. I would then want to be >>>>>>>>>>> able >>>>>>>>>>> to >>>>>>>>>>> read >>>>>>>>>>> from the input stream and create a NormalizedMessage to put on >>>>>>>>>>> the >>>>>>>>>>> bus >>>>>>>>>>> and >>>>>>>>>>> also be able to get a message from the bus and write it to the >>>>>>>>>>> output >>>>>>>>>>> stream. >>>>>>>>>>> >>>>>>>>>>> Is there anything like this out there? >>>>>>>>>>> >>>>>>>>>>> Thanks, >>>>>>>>>>> Doug >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> >>> >>> >>> >> >> > > > -- View this message in context: http://www.nabble.com/Basic-TCP-component-tf1595103s12049.html#a13105397 Sent from the ServiceMix - User mailing list archive at Nabble.com.
