Problems with transport level protocols is that they are, well,
transport level.  So the application need to define an application
level protocol on top of these (such as http which sits on top of
tcp).  Mina has good support for such thing iirc, so it's really worth
taking a look at it.

On 10/8/07, jpuro <[EMAIL PROTECTED]> wrote:
>
> Hmmm...  So maybe it makes more sense to re-use this in ServiceMix.  It would
> be nice to have something that is bundled with servicemix that allows for
> tcp/udp/etc protocols.
>
>
> Andrea Zoppello-2 wrote:
> >
> > In spagic we've one component to handle low level tcp protocol:
> >
> > Take a look at it: http://forge.objectweb.org/projects/spagic
> >
> > Andrea Zoppello
> >
> > jpuro ha scritto:
> >> Has a standard jbi BC component been created for this yet?  I think not,
> >> but
> >> was curious.
> >>
> >> -jeff
> >>
> >>
> >> dfischer wrote:
> >>
> >>> Forget it, here is the code.  So sorry for so many emails.
> >>>
> >>>
> >>> package org.apache.servicemix.components.tcp;
> >>>
> >>> import org.apache.log4j.Logger;
> >>>
> >>> import java.io.IOException;
> >>> import java.io.InputStream;
> >>> import java.io.OutputStream;
> >>> import java.net.ConnectException;
> >>> import java.net.InetSocketAddress;
> >>> import java.net.Socket;
> >>> import java.net.SocketException;
> >>> import java.nio.channels.SocketChannel;
> >>>
> >>> import javax.jbi.JBIException;
> >>> import javax.jbi.messaging.InOnly;
> >>> import javax.jbi.messaging.MessageExchange;
> >>> import javax.jbi.messaging.MessagingException;
> >>> import javax.jbi.messaging.NormalizedMessage;
> >>> import javax.xml.transform.TransformerException;
> >>>
> >>> import org.apache.servicemix.MessageExchangeListener;
> >>> import org.apache.servicemix.components.util.ComponentSupport;
> >>> import org.springframework.beans.factory.InitializingBean;
> >>>
> >>> public abstract class TcpComponentSupport extends ComponentSupport
> >>> implements
> >>>         MessageExchangeListener, InitializingBean {
> >>>     /**
> >>>      * Logger for this class
> >>>      */
> >>>     private static final Logger log = Logger
> >>>             .getLogger(TcpComponentSupport.class);
> >>>
> >>>     private String host;
> >>>     private int port = -1;
> >>>     private boolean useNIO = false;
> >>>     private int retryInterval = 10000;
> >>>     protected SocketChannel socketChannel = null;
> >>>     private Socket socket = null;
> >>>     protected InputStream in = null;
> >>>     protected OutputStream out = null;
> >>>     protected boolean running = false;
> >>>     private TcpMarshaler marshaler = new TcpMarshaler();
> >>>
> >>>     protected abstract void processConnection() throws JBIException,
> >>> IOException;
> >>>     protected abstract void processIncomingMessage(String tcpMessage);
> >>>
> >>>     public void setHost(String host) {
> >>>         this.host = host;
> >>>     } // en dof setHost(String)(
> >>>
> >>>     public void setPort(int port) {
> >>>         this.port = port;
> >>>     } // en dof setPort(int)
> >>>
> >>>     public void setUseNIO(boolean useNIO) {
> >>>         this.useNIO = useNIO;
> >>>     } // end of setUserNIO(boolean)
> >>>
> >>>     public void setRetryInterval(int retryInterval) {
> >>>         this.retryInterval = retryInterval;
> >>>     } // end of setRetryInterval(int)
> >>>
> >>>     protected boolean getUseNIO() {
> >>>         return this.useNIO;
> >>>     } // end of getUseNIO()
> >>>
> >>>     protected String getHost() {
> >>>         return this.host;
> >>>     } // en dof getHost()
> >>>
> >>>     protected int getPort() {
> >>>         return this.port;
> >>>     } // en dof getPort()
> >>>
> >>>     public void onMessageExchange(MessageExchange exchange)
> >>>             throws MessagingException {
> >>>         String tcpMessage = null;
> >>>         try {
> >>>             tcpMessage =
> >>> this.marshaler.fromNMS(exchange.getMessage("in"));
> >>>             this.processIncomingMessage(tcpMessage);
> >>>             this.done(exchange);
> >>>         } catch (TransformerException e) {
> >>>             throw new MessagingException(e);
> >>>         } // end of try /catch
> >>>     } // end of onMessageExchange(MessageExchange)
> >>>
> >>>     public void afterPropertiesSet() throws Exception {
> >>>         if (host == null) {
> >>>             throw new IllegalArgumentException("You must specify the
> >>> host
> >>> property.");
> >>>         } // end of if
> >>>
> >>>         if (this.port < 0) {
> >>>             throw new IllegalArgumentException("You must specify the
> >>> port
> >>> property.");
> >>>         } // end of if
> >>>     } // end of afterPropertiesSet()
> >>>
> >>>     public void start() throws JBIException {
> >>>         if (this.useNIO) {
> >>>             // configure the NIO SocketChannel
> >>>             this.startNIO();
> >>>         } else {
> >>>             // this is the default
> >>>             // we will simply open a basic socket and create the Input
> >>> and
> >>> Ooutput Streams
> >>>             this.startNonNIO();
> >>>         } // end of if / else
> >>>     } // end of start()
> >>>
> >>>     public void stop() throws JBIException {
> >>>         if (useNIO) {
> >>>             this.stopNIO();
> >>>         } else {
> >>>             this.stopNonNIO();
> >>>         } // ne do if / else
> >>>     } // end of stop()
> >>>
> >>>     protected void sendToBus(String message) {
> >>>         log.debug("Inside sendToBus(), sending message: " + message);
> >>>         try {
> >>>             InOnly exchange =
> >>> this.getExchangeFactory().createInOnlyExchange();
> >>>             NormalizedMessage normalizedMessage =
> >>> exchange.createMessage();
> >>>             exchange.setInMessage(normalizedMessage);
> >>>             this.marshaler.toNMS(normalizedMessage, message);
> >>>             this.getDeliveryChannel().sendSync(exchange);
> >>>             //done(exchange);
> >>>         } catch (MessagingException me) {
> >>>             throw new TcpConnectionException(me, message);
> >>>         } // end of try / catch
> >>>     } // end of sendToBus(String)
> >>>
> >>>     /**
> >>>      * This is an empty implementation of the init().
> >>>      * This method exists so that the concrete class can override it
> >>>      * if there is something that needs to be done to initialize the
> >>> communication
> >>>      * with the server.
> >>>      *
> >>>      * This method is called after the connection has been made and
> >>> after
> >>> the
> >>>      * Input and Output Streams have been assigned.
> >>>      *
> >>>      * This method is called right before the processConnection().
> >>>      */
> >>>     protected void init() {
> >>>         // nothing is done here
> >>>         // if something needs to be done here in the concrete class
> >>>         // this method is called after the connection is made and the
> >>> Input
> >>> and Output Streams
> >>>         // have been set but before the processConnection()
> >>>     } // end of init()
> >>>
> >>>     /**
> >>>      * This is an empty implementation of the cleanup().
> >>>      *
> >>>      * This method exists so the the concrete class can override it
> >>>      * if there is something that needs to be done to cleanup resources
> >>> before the
> >>>      * connection is shutdown.
> >>>      *
> >>>      * This method is called while the connection is still active and
> >>> the
> >>> Input
> >>>      * and Output Streams are still available.
> >>>      */
> >>>     protected void cleanup() {
> >>>         // nothing is done here
> >>>         // if something needs to be done here in the concrete class
> >>>     } // end of init()
> >>>
> >>>     private void startNIO() throws JBIException {
> >>>         if (log.isInfoEnabled()) {
> >>>             log.info("Using NIO to connect to " + this.host + ":" +
> >>> this.port);
> >>>         } // en dof if
> >>>
> >>>         if (!this.running) {
> >>>             this.connectNIO();
> >>>             try {
> >>>                 this.init();
> >>>                 this.processConnection();
> >>>             } catch (SocketException e) {
> >>>                 this.running = false;
> >>>                 this.startNIO();
> >>>             } catch (IOException ioe) {
> >>>                 throw new JBIException(ioe);
> >>>             } // end of try / catch
> >>>         } // end of if
> >>>     } // end of startNIO()
> >>>
> >>>     private void connectNIO() throws JBIException {
> >>>         try {
> >>>             this.socketChannel = SocketChannel.open();
> >>>             this.socketChannel.configureBlocking(false);
> >>>
> >>>             this.socketChannel.connect(new InetSocketAddress(this.host,
> >>> this.port));
> >>>             int count = 0;
> >>>             while (!this.socketChannel.finishConnect()) {
> >>>                 log.debug("Finishing connection: " + (count++) + "
> >>> times");
> >>>             } // end of while loop
> >>>
> >>>             this.running = true;
> >>>         } catch (ConnectException ce) {
> >>>             log.warn(this.host + ":" + this.port + " unavailable,
> >>> waiting
> >>> "
> >>> + this.retryInterval + " milliseconds before trying again.");
> >>>             this.running = false;
> >>>             try {
> >>>                 Thread.sleep(this.retryInterval);
> >>>             } catch (InterruptedException e) {}
> >>>             this.connectNIO();
> >>>         } catch (Exception e) {
> >>>             log.error("Exception while trying to make a tcp connection
> >>> to
> >>> "
> >>> + this.host + ":" + this.port);
> >>>             log.error(e);
> >>>             this.running = false;
> >>>             throw new JBIException("Failed to connect to " + this.host +
> >>> ":"
> >>> + this.port, e);
> >>>         } // end of try / catch
> >>>     } // end of connectNIO()
> >>>
> >>>     private void startNonNIO() throws JBIException {
> >>>         if (log.isInfoEnabled()) {
> >>>             log.info("Using basic sockets to connect to " + this.host +
> >>> ":"
> >>> + this.port);
> >>>         } // en dof if
> >>>
> >>>         if (!this.running) {
> >>>             this.connectNonNIO();
> >>>             try {
> >>>                 this.init();
> >>>                 this.processConnection();
> >>>             } catch (SocketException e) {
> >>>                 this.running = false;
> >>>                 this.startNonNIO();
> >>>             } catch (IOException ioe) {
> >>>                 throw new JBIException(ioe);
> >>>             } // end of try / catch
> >>>         } // end of if
> >>>     } // end of startNonNIO()
> >>>
> >>>     private void connectNonNIO() throws JBIException {
> >>>         try {
> >>>             this.socket = new Socket(this.host, this.port);
> >>>
> >>>             this.in = this.socket.getInputStream();
> >>>             this.out = this.socket.getOutputStream();
> >>>
> >>>             this.running = true;
> >>>         } catch (ConnectException ce) {
> >>>             log.warn(this.host + ":" + this.port + " unavailable,
> >>> waiting
> >>> "
> >>> + this.retryInterval + " milliseconds before trying again.");
> >>>             this.running = false;
> >>>             try {
> >>>                 Thread.sleep(this.retryInterval);
> >>>             } catch (InterruptedException e) {}
> >>>             this.connectNonNIO();
> >>>         } catch (Exception e) {
> >>>             log.error("Exception while trying to make a tcp connection
> >>> to
> >>> "
> >>> + this.host + ":" + this.port);
> >>>             log.error(e);
> >>>             this.running = false;
> >>>             throw new JBIException("Failed to connect to " + this.host +
> >>> ":"
> >>> + this.port, e);
> >>>         } // end of try / catch
> >>>     } // end of connectNonNIO()
> >>>
> >>>     private void stopNIO() throws JBIException {
> >>>         log.debug("Inside stopNIO()");
> >>>         if (this.socketChannel != null) {
> >>>             this.cleanup();
> >>>             try {
> >>>                 this.socketChannel.close();
> >>>                 this.socketChannel = null;
> >>>                 this.running = false;
> >>>             } catch (IOException e) {
> >>>                 throw new JBIException("Unable to close connection", e);
> >>>             } // end of try / catch
> >>>         } // end of if
> >>>     } // end of stopNIO()
> >>>
> >>>     private void stopNonNIO() throws JBIException {
> >>>         log.debug("Inside stopNonNIO()");
> >>>         if (this.socket != null) {
> >>>             if (this.socket.isConnected()) {
> >>>                 this.cleanup();
> >>>                 try {
> >>>                     this.socket.close();
> >>>                     this.in = null;
> >>>                     this.out = null;
> >>>                     this.socket = null;
> >>>                     this.running = false;
> >>>                 } catch (IOException e) {
> >>>                     throw new JBIException("Unable to close connection",
> >>> e);
> >>>                 } // end of try / catch
> >>>             } // end of if
> >>>         } // end of if
> >>>     } // end of stopNonNIO()
> >>> } // end of class TcpComponentSupport
> >>>
> >>>
> >>>
> >>> On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
> >>>
> >>>
> >>>> Sorry, let me try it again with just the .java file.
> >>>>
> >>>>
> >>>> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
> >>>>
> >>>>
> >>>>> Hopefully this time the attachment will go through.  I am not exactly
> >>>>> sure
> >>>>> why it didn't before.  It is a zip file.
> >>>>>
> >>>>> I guess I was thinking that when the component is deployed into the
> >>>>> lwcontainer that it would be running in its own thread anyway.
> >>>>> Apparently I
> >>>>> was incorrect in my assumption.
> >>>>>
> >>>>> I am still trying to get a grasp on the whole ServiceMix thing so
> >>>>> please
> >>>>> excuse my ignorance.
> >>>>>
> >>>>> Thanks,
> >>>>> Doug
> >>>>>
> >>>>>
> >>>>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
> >>>>>
> >>>>>
> >>>>>> I think the attachment have been lost.
> >>>>>>
> >>>>>> Btw, for a tcp consumer that creates a server socket, it has to wait
> >>>>>> for incoming connections, so how can you not start a thread ? The
> >>>>>> jabber api uses a listener, so it is really different.
> >>>>>>
> >>>>>> Cheers,
> >>>>>> Guillaume Nodet
> >>>>>>
> >>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
> >>>>>>
> >>>>>>> I have attached my TcpComponentSupport class to this email.  If you
> >>>>>>> could
> >>>>>>> be
> >>>>>>> so kind to take a look at it and let me know if it looks ok.  You
> >>>>>>> mentioned
> >>>>>>> that the start method should start a Thread and then kill that
> >>>>>>> Thread
> >>>>>>> in
> >>>>>>> the
> >>>>>>> stop method.  I am not actually doing this right now however I of
> >>>>>>> course
> >>>>>>> could implement it that way if necessary.  I attempted to use the
> >>>>>>> JabberComponentSupport class as a template.
> >>>>>>>
> >>>>>>> Please let me know your thoughts.
> >>>>>>>
> >>>>>>> Thanks,
> >>>>>>> Doug
> >>>>>>>
> >>>>>>>
> >>>>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
> >>>>>>>
> >>>>>>>
> >>>>>>>> Not sure to understand the problem.
> >>>>>>>> Usually a lightweight consumer BC will begin to receive messages
> >>>>>>>> when
> >>>>>>>> the component is started.  It means that all the necessary
> >>>>>>>> configuration parameters have been set.
> >>>>>>>> The tcp component needs to start a thread inside the start method
> >>>>>>>> (and
> >>>>>>>> kill it inside the stop method).
> >>>>>>>>
> >>>>>>>> The second problem is related to components / service units
> >>>>>>>> lifecycle.
> >>>>>>>> When a lightweight component is deployed to the lwcontainer, he has
> >>>>>>>> a
> >>>>>>>> specific lifecycle: when the service unit is started, the component
> >>>>>>>> is
> >>>>>>>> started and when the service unit is stopped, the component is
> >>>>>>>> shutdown.
> >>>>>>>>
> >>>>>>>> But the problem should only occur when someone sends something on
> >>>>>>>> the
> >>>>>>>> socket
> >>>>>>>> ...
> >>>>>>>>
> >>>>>>>> Cheers,
> >>>>>>>> Guillaume Nodet
> >>>>>>>>
> >>>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
> >>>>>>>>
> >>>>>>>>> Guillaume,
> >>>>>>>>>
> >>>>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an
> >>>>>>>>> forgot
> >>>>>>>>> all
> >>>>>>>>> about it).  Anyway, thank you for the reply though.
> >>>>>>>>>
> >>>>>>>>> I have been able to implement a TcpComponentSupport class, at
> >>>>>>>>> least
> >>>>>>>>> I
> >>>>>>>>> think
> >>>>>>>>> so anyway.  The problem that I am having now is that as soon as I
> >>>>>>>>> deploy
> >>>>>>>>> the
> >>>>>>>>> component it starts collecting data and pushing it to the bus.  I
> >>>>>>>>> have
> >>>>>>>>> run
> >>>>>>>>> into a couple of problems because of this.  The first problem is
> >>>>>>>>> while
> >>>>>>>>> testing, I would like to extend from TestSupport and use the
> >>>>>>>>> assertMessagesReceived(), however when it loads the
> >>>>>>>>> ApplicationContext,
> >>>>>>>>> the
> >>>>>>>>> concrete TcpComponentSupport class simply begins communicating
> >>>>>>>>> over
> >>>>>>>>> the
> >>>>>>>>> socket and the test method with the assertMessagesReceived() call
> >>>>>>>>> is
> >>>>>>>>> never
> >>>>>>>>> executed.  The test will just run forever.
> >>>>>>>>>
> >>>>>>>>> The second question that I have is along the same lines.  I would
> >>>>>>>>> like to
> >>>>>>>>> create a service unit, with simply this component in it and deploy
> >>>>>>>>> it to
> >>>>>>>>> ServiceMix and then use something like eip to route the messages
> >>>>>>>>> where I
> >>>>>>>>> want them.  The same kind of problem occurs in this scenario, as
> >>>>>>>>> soon as
> >>>>>>>>> the
> >>>>>>>>> service unit is deployed, it will start collecting data from the
> >>>>>>>>> stream
> >>>>>>>>> and
> >>>>>>>>> trying to push it to the bus, the problem is that there is no
> >>>>>>>>> destination
> >>>>>>>>> assigned to the the component yet so it tries to send it to
> >>>>>>>>> "service:
> >>>>>>>>> null
> >>>>>>>>> and interface null".
> >>>>>>>>>
> >>>>>>>>> Do you have any ideas?
> >>>>>>>>>
> >>>>>>>>> Thank you,
> >>>>>>>>> Doug
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>> It seems there's none -- if I recall, you have already asked this
> >>>>>>>>>> question
> >>>>>>>>>> ;)
> >>>>>>>>>> However, it should not be very difficult to implement.
> >>>>>>>>>> You have to inherit the
> >>>>>>>>>> org.apache.servicemix.components.util.ComponentSupport and
> >>>>>>>>>> implements
> >>>>>>>>>> MessageExchangeListener.  Then, just create your socket and read
> >>>>>>>>>> it
> >>>>>>>>>> :)
> >>>>>>>>>> You may need another lw component for sending over tcp, also.
> >>>>>>>>>>
> >>>>>>>>>> Cheers,
> >>>>>>>>>> Guillaume Nodet
> >>>>>>>>>>
> >>>>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
> >>>>>>>>>>
> >>>>>>>>>>> Does anyone know if a basic TCP component is available?  What I
> >>>>>>>>>>> would
> >>>>>>>>>>> like
> >>>>>>>>>>> to be able to do is simply extend from a TCP component, set the
> >>>>>>>>>>> host
> >>>>>>>>>>> and
> >>>>>>>>>>> port in the servicemix.xml file, and have access to the input
> >>>>>>>>>>> and
> >>>>>>>>>>> output
> >>>>>>>>>>> streams in my implementation class.  I would then want to be
> >>>>>>>>>>> able
> >>>>>>>>>>> to
> >>>>>>>>>>> read
> >>>>>>>>>>> from the input stream and create a NormalizedMessage to put on
> >>>>>>>>>>> the
> >>>>>>>>>>> bus
> >>>>>>>>>>> and
> >>>>>>>>>>> also be able to get a message from the bus and write it to the
> >>>>>>>>>>> output
> >>>>>>>>>>> stream.
> >>>>>>>>>>>
> >>>>>>>>>>> Is there anything like this out there?
> >>>>>>>>>>>
> >>>>>>>>>>> Thanks,
> >>>>>>>>>>> Doug
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>>>
> >>>>>>>>>
> >>>>>>>>>
> >>>>>>>
> >>>>>>>
> >>>
> >>>
> >>>
> >>
> >>
> >
> >
> >
>
> --
> View this message in context: 
> http://www.nabble.com/Basic-TCP-component-tf1595103s12049.html#a13105397
> Sent from the ServiceMix - User mailing list archive at Nabble.com.
>
>


-- 
Cheers,
Guillaume Nodet
------------------------
Blog: http://gnodet.blogspot.com/

Reply via email to