Has a standard jbi BC component been created for this yet? I think not, but
was curious.
-jeff
dfischer wrote:
>
> Forget it, here is the code. So sorry for so many emails.
>
>
> package org.apache.servicemix.components.tcp;
>
> import org.apache.log4j.Logger;
>
> import java.io.IOException;
> import java.io.InputStream;
> import java.io.OutputStream;
> import java.net.ConnectException;
> import java.net.InetSocketAddress;
> import java.net.Socket;
> import java.net.SocketException;
> import java.nio.channels.SocketChannel;
>
> import javax.jbi.JBIException;
> import javax.jbi.messaging.InOnly;
> import javax.jbi.messaging.MessageExchange;
> import javax.jbi.messaging.MessagingException;
> import javax.jbi.messaging.NormalizedMessage;
> import javax.xml.transform.TransformerException;
>
> import org.apache.servicemix.MessageExchangeListener;
> import org.apache.servicemix.components.util.ComponentSupport;
> import org.springframework.beans.factory.InitializingBean;
>
> public abstract class TcpComponentSupport extends ComponentSupport
> implements
> MessageExchangeListener, InitializingBean {
> /**
> * Logger for this class
> */
> private static final Logger log = Logger
> .getLogger(TcpComponentSupport.class);
>
> private String host;
> private int port = -1;
> private boolean useNIO = false;
> private int retryInterval = 10000;
> protected SocketChannel socketChannel = null;
> private Socket socket = null;
> protected InputStream in = null;
> protected OutputStream out = null;
> protected boolean running = false;
> private TcpMarshaler marshaler = new TcpMarshaler();
>
> protected abstract void processConnection() throws JBIException,
> IOException;
> protected abstract void processIncomingMessage(String tcpMessage);
>
> public void setHost(String host) {
> this.host = host;
> } // en dof setHost(String)(
>
> public void setPort(int port) {
> this.port = port;
> } // en dof setPort(int)
>
> public void setUseNIO(boolean useNIO) {
> this.useNIO = useNIO;
> } // end of setUserNIO(boolean)
>
> public void setRetryInterval(int retryInterval) {
> this.retryInterval = retryInterval;
> } // end of setRetryInterval(int)
>
> protected boolean getUseNIO() {
> return this.useNIO;
> } // end of getUseNIO()
>
> protected String getHost() {
> return this.host;
> } // en dof getHost()
>
> protected int getPort() {
> return this.port;
> } // en dof getPort()
>
> public void onMessageExchange(MessageExchange exchange)
> throws MessagingException {
> String tcpMessage = null;
> try {
> tcpMessage =
> this.marshaler.fromNMS(exchange.getMessage("in"));
> this.processIncomingMessage(tcpMessage);
> this.done(exchange);
> } catch (TransformerException e) {
> throw new MessagingException(e);
> } // end of try /catch
> } // end of onMessageExchange(MessageExchange)
>
> public void afterPropertiesSet() throws Exception {
> if (host == null) {
> throw new IllegalArgumentException("You must specify the host
> property.");
> } // end of if
>
> if (this.port < 0) {
> throw new IllegalArgumentException("You must specify the port
> property.");
> } // end of if
> } // end of afterPropertiesSet()
>
> public void start() throws JBIException {
> if (this.useNIO) {
> // configure the NIO SocketChannel
> this.startNIO();
> } else {
> // this is the default
> // we will simply open a basic socket and create the Input and
> Ooutput Streams
> this.startNonNIO();
> } // end of if / else
> } // end of start()
>
> public void stop() throws JBIException {
> if (useNIO) {
> this.stopNIO();
> } else {
> this.stopNonNIO();
> } // ne do if / else
> } // end of stop()
>
> protected void sendToBus(String message) {
> log.debug("Inside sendToBus(), sending message: " + message);
> try {
> InOnly exchange =
> this.getExchangeFactory().createInOnlyExchange();
> NormalizedMessage normalizedMessage =
> exchange.createMessage();
> exchange.setInMessage(normalizedMessage);
> this.marshaler.toNMS(normalizedMessage, message);
> this.getDeliveryChannel().sendSync(exchange);
> //done(exchange);
> } catch (MessagingException me) {
> throw new TcpConnectionException(me, message);
> } // end of try / catch
> } // end of sendToBus(String)
>
> /**
> * This is an empty implementation of the init().
> * This method exists so that the concrete class can override it
> * if there is something that needs to be done to initialize the
> communication
> * with the server.
> *
> * This method is called after the connection has been made and after
> the
> * Input and Output Streams have been assigned.
> *
> * This method is called right before the processConnection().
> */
> protected void init() {
> // nothing is done here
> // if something needs to be done here in the concrete class
> // this method is called after the connection is made and the
> Input
> and Output Streams
> // have been set but before the processConnection()
> } // end of init()
>
> /**
> * This is an empty implementation of the cleanup().
> *
> * This method exists so the the concrete class can override it
> * if there is something that needs to be done to cleanup resources
> before the
> * connection is shutdown.
> *
> * This method is called while the connection is still active and the
> Input
> * and Output Streams are still available.
> */
> protected void cleanup() {
> // nothing is done here
> // if something needs to be done here in the concrete class
> } // end of init()
>
> private void startNIO() throws JBIException {
> if (log.isInfoEnabled()) {
> log.info("Using NIO to connect to " + this.host + ":" +
> this.port);
> } // en dof if
>
> if (!this.running) {
> this.connectNIO();
> try {
> this.init();
> this.processConnection();
> } catch (SocketException e) {
> this.running = false;
> this.startNIO();
> } catch (IOException ioe) {
> throw new JBIException(ioe);
> } // end of try / catch
> } // end of if
> } // end of startNIO()
>
> private void connectNIO() throws JBIException {
> try {
> this.socketChannel = SocketChannel.open();
> this.socketChannel.configureBlocking(false);
>
> this.socketChannel.connect(new InetSocketAddress(this.host,
> this.port));
> int count = 0;
> while (!this.socketChannel.finishConnect()) {
> log.debug("Finishing connection: " + (count++) + "
> times");
> } // end of while loop
>
> this.running = true;
> } catch (ConnectException ce) {
> log.warn(this.host + ":" + this.port + " unavailable, waiting
> "
> + this.retryInterval + " milliseconds before trying again.");
> this.running = false;
> try {
> Thread.sleep(this.retryInterval);
> } catch (InterruptedException e) {}
> this.connectNIO();
> } catch (Exception e) {
> log.error("Exception while trying to make a tcp connection to
> "
> + this.host + ":" + this.port);
> log.error(e);
> this.running = false;
> throw new JBIException("Failed to connect to " + this.host +
> ":"
> + this.port, e);
> } // end of try / catch
> } // end of connectNIO()
>
> private void startNonNIO() throws JBIException {
> if (log.isInfoEnabled()) {
> log.info("Using basic sockets to connect to " + this.host +
> ":"
> + this.port);
> } // en dof if
>
> if (!this.running) {
> this.connectNonNIO();
> try {
> this.init();
> this.processConnection();
> } catch (SocketException e) {
> this.running = false;
> this.startNonNIO();
> } catch (IOException ioe) {
> throw new JBIException(ioe);
> } // end of try / catch
> } // end of if
> } // end of startNonNIO()
>
> private void connectNonNIO() throws JBIException {
> try {
> this.socket = new Socket(this.host, this.port);
>
> this.in = this.socket.getInputStream();
> this.out = this.socket.getOutputStream();
>
> this.running = true;
> } catch (ConnectException ce) {
> log.warn(this.host + ":" + this.port + " unavailable, waiting
> "
> + this.retryInterval + " milliseconds before trying again.");
> this.running = false;
> try {
> Thread.sleep(this.retryInterval);
> } catch (InterruptedException e) {}
> this.connectNonNIO();
> } catch (Exception e) {
> log.error("Exception while trying to make a tcp connection to
> "
> + this.host + ":" + this.port);
> log.error(e);
> this.running = false;
> throw new JBIException("Failed to connect to " + this.host +
> ":"
> + this.port, e);
> } // end of try / catch
> } // end of connectNonNIO()
>
> private void stopNIO() throws JBIException {
> log.debug("Inside stopNIO()");
> if (this.socketChannel != null) {
> this.cleanup();
> try {
> this.socketChannel.close();
> this.socketChannel = null;
> this.running = false;
> } catch (IOException e) {
> throw new JBIException("Unable to close connection", e);
> } // end of try / catch
> } // end of if
> } // end of stopNIO()
>
> private void stopNonNIO() throws JBIException {
> log.debug("Inside stopNonNIO()");
> if (this.socket != null) {
> if (this.socket.isConnected()) {
> this.cleanup();
> try {
> this.socket.close();
> this.in = null;
> this.out = null;
> this.socket = null;
> this.running = false;
> } catch (IOException e) {
> throw new JBIException("Unable to close connection",
> e);
> } // end of try / catch
> } // end of if
> } // end of if
> } // end of stopNonNIO()
> } // end of class TcpComponentSupport
>
>
>
> On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>
>> Sorry, let me try it again with just the .java file.
>>
>>
>> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>>
>>> Hopefully this time the attachment will go through. I am not exactly
>>> sure
>>> why it didn't before. It is a zip file.
>>>
>>> I guess I was thinking that when the component is deployed into the
>>> lwcontainer that it would be running in its own thread anyway.
>>> Apparently I
>>> was incorrect in my assumption.
>>>
>>> I am still trying to get a grasp on the whole ServiceMix thing so please
>>> excuse my ignorance.
>>>
>>> Thanks,
>>> Doug
>>>
>>>
>>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>
>>>> I think the attachment have been lost.
>>>>
>>>> Btw, for a tcp consumer that creates a server socket, it has to wait
>>>> for incoming connections, so how can you not start a thread ? The
>>>> jabber api uses a listener, so it is really different.
>>>>
>>>> Cheers,
>>>> Guillaume Nodet
>>>>
>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>> I have attached my TcpComponentSupport class to this email. If you
>>>>> could
>>>>> be
>>>>> so kind to take a look at it and let me know if it looks ok. You
>>>>> mentioned
>>>>> that the start method should start a Thread and then kill that Thread
>>>>> in
>>>>> the
>>>>> stop method. I am not actually doing this right now however I of
>>>>> course
>>>>> could implement it that way if necessary. I attempted to use the
>>>>> JabberComponentSupport class as a template.
>>>>>
>>>>> Please let me know your thoughts.
>>>>>
>>>>> Thanks,
>>>>> Doug
>>>>>
>>>>>
>>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>
>>>>>> Not sure to understand the problem.
>>>>>> Usually a lightweight consumer BC will begin to receive messages when
>>>>>> the component is started. It means that all the necessary
>>>>>> configuration parameters have been set.
>>>>>> The tcp component needs to start a thread inside the start method
>>>>>> (and
>>>>>> kill it inside the stop method).
>>>>>>
>>>>>> The second problem is related to components / service units
>>>>>> lifecycle.
>>>>>> When a lightweight component is deployed to the lwcontainer, he has a
>>>>>> specific lifecycle: when the service unit is started, the component
>>>>>> is
>>>>>> started and when the service unit is stopped, the component is
>>>>>> shutdown.
>>>>>>
>>>>>> But the problem should only occur when someone sends something on the
>>>>>> socket
>>>>>> ...
>>>>>>
>>>>>> Cheers,
>>>>>> Guillaume Nodet
>>>>>>
>>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>> Guillaume,
>>>>>>>
>>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an
>>>>>>> forgot
>>>>>>> all
>>>>>>> about it). Anyway, thank you for the reply though.
>>>>>>>
>>>>>>> I have been able to implement a TcpComponentSupport class, at least
>>>>>>> I
>>>>>>> think
>>>>>>> so anyway. The problem that I am having now is that as soon as I
>>>>>>> deploy
>>>>>>> the
>>>>>>> component it starts collecting data and pushing it to the bus. I
>>>>>>> have
>>>>>>> run
>>>>>>> into a couple of problems because of this. The first problem is
>>>>>>> while
>>>>>>> testing, I would like to extend from TestSupport and use the
>>>>>>> assertMessagesReceived(), however when it loads the
>>>>>>> ApplicationContext,
>>>>>>> the
>>>>>>> concrete TcpComponentSupport class simply begins communicating over
>>>>>>> the
>>>>>>> socket and the test method with the assertMessagesReceived() call is
>>>>>>> never
>>>>>>> executed. The test will just run forever.
>>>>>>>
>>>>>>> The second question that I have is along the same lines. I would
>>>>>>> like to
>>>>>>> create a service unit, with simply this component in it and deploy
>>>>>>> it to
>>>>>>> ServiceMix and then use something like eip to route the messages
>>>>>>> where I
>>>>>>> want them. The same kind of problem occurs in this scenario, as
>>>>>>> soon as
>>>>>>> the
>>>>>>> service unit is deployed, it will start collecting data from the
>>>>>>> stream
>>>>>>> and
>>>>>>> trying to push it to the bus, the problem is that there is no
>>>>>>> destination
>>>>>>> assigned to the the component yet so it tries to send it to
>>>>>>> "service:
>>>>>>> null
>>>>>>> and interface null".
>>>>>>>
>>>>>>> Do you have any ideas?
>>>>>>>
>>>>>>> Thank you,
>>>>>>> Doug
>>>>>>>
>>>>>>>
>>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>>>
>>>>>>>> It seems there's none -- if I recall, you have already asked this
>>>>>>>> question
>>>>>>>> ;)
>>>>>>>> However, it should not be very difficult to implement.
>>>>>>>> You have to inherit the
>>>>>>>> org.apache.servicemix.components.util.ComponentSupport and
>>>>>>>> implements
>>>>>>>> MessageExchangeListener. Then, just create your socket and read it
>>>>>>>> :)
>>>>>>>> You may need another lw component for sending over tcp, also.
>>>>>>>>
>>>>>>>> Cheers,
>>>>>>>> Guillaume Nodet
>>>>>>>>
>>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>>> Does anyone know if a basic TCP component is available? What I
>>>>>>>>> would
>>>>>>>>> like
>>>>>>>>> to be able to do is simply extend from a TCP component, set the
>>>>>>>>> host
>>>>>>>>> and
>>>>>>>>> port in the servicemix.xml file, and have access to the input and
>>>>>>>>> output
>>>>>>>>> streams in my implementation class. I would then want to be able
>>>>>>>>> to
>>>>>>>>> read
>>>>>>>>> from the input stream and create a NormalizedMessage to put on the
>>>>>>>>> bus
>>>>>>>>> and
>>>>>>>>> also be able to get a message from the bus and write it to the
>>>>>>>>> output
>>>>>>>>> stream.
>>>>>>>>>
>>>>>>>>> Is there anything like this out there?
>>>>>>>>>
>>>>>>>>> Thanks,
>>>>>>>>> Doug
>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>
>
>
>
--
View this message in context:
http://www.nabble.com/Basic-TCP-component-tf1595103s12049.html#a13099933
Sent from the ServiceMix - User mailing list archive at Nabble.com.