Forget it, here is the code. So sorry for so many emails.
package org.apache.servicemix.components.tcp;
import org.apache.log4j.Logger;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;
import javax.jbi.JBIException;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.transform.TransformerException;
import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.components.util.ComponentSupport;
import org.springframework.beans.factory.InitializingBean;
public abstract class TcpComponentSupport extends ComponentSupport
implements
MessageExchangeListener, InitializingBean {
/**
* Logger for this class
*/
private static final Logger log = Logger
.getLogger(TcpComponentSupport.class);
private String host;
private int port = -1;
private boolean useNIO = false;
private int retryInterval = 10000;
protected SocketChannel socketChannel = null;
private Socket socket = null;
protected InputStream in = null;
protected OutputStream out = null;
protected boolean running = false;
private TcpMarshaler marshaler = new TcpMarshaler();
protected abstract void processConnection() throws JBIException,
IOException;
protected abstract void processIncomingMessage(String tcpMessage);
public void setHost(String host) {
this.host = host;
} // en dof setHost(String)(
public void setPort(int port) {
this.port = port;
} // en dof setPort(int)
public void setUseNIO(boolean useNIO) {
this.useNIO = useNIO;
} // end of setUserNIO(boolean)
public void setRetryInterval(int retryInterval) {
this.retryInterval = retryInterval;
} // end of setRetryInterval(int)
protected boolean getUseNIO() {
return this.useNIO;
} // end of getUseNIO()
protected String getHost() {
return this.host;
} // en dof getHost()
protected int getPort() {
return this.port;
} // en dof getPort()
public void onMessageExchange(MessageExchange exchange)
throws MessagingException {
String tcpMessage = null;
try {
tcpMessage = this.marshaler.fromNMS(exchange.getMessage("in"));
this.processIncomingMessage(tcpMessage);
this.done(exchange);
} catch (TransformerException e) {
throw new MessagingException(e);
} // end of try /catch
} // end of onMessageExchange(MessageExchange)
public void afterPropertiesSet() throws Exception {
if (host == null) {
throw new IllegalArgumentException("You must specify the host
property.");
} // end of if
if (this.port < 0) {
throw new IllegalArgumentException("You must specify the port
property.");
} // end of if
} // end of afterPropertiesSet()
public void start() throws JBIException {
if (this.useNIO) {
// configure the NIO SocketChannel
this.startNIO();
} else {
// this is the default
// we will simply open a basic socket and create the Input and
Ooutput Streams
this.startNonNIO();
} // end of if / else
} // end of start()
public void stop() throws JBIException {
if (useNIO) {
this.stopNIO();
} else {
this.stopNonNIO();
} // ne do if / else
} // end of stop()
protected void sendToBus(String message) {
log.debug("Inside sendToBus(), sending message: " + message);
try {
InOnly exchange =
this.getExchangeFactory().createInOnlyExchange();
NormalizedMessage normalizedMessage = exchange.createMessage();
exchange.setInMessage(normalizedMessage);
this.marshaler.toNMS(normalizedMessage, message);
this.getDeliveryChannel().sendSync(exchange);
//done(exchange);
} catch (MessagingException me) {
throw new TcpConnectionException(me, message);
} // end of try / catch
} // end of sendToBus(String)
/**
* This is an empty implementation of the init().
* This method exists so that the concrete class can override it
* if there is something that needs to be done to initialize the
communication
* with the server.
*
* This method is called after the connection has been made and after
the
* Input and Output Streams have been assigned.
*
* This method is called right before the processConnection().
*/
protected void init() {
// nothing is done here
// if something needs to be done here in the concrete class
// this method is called after the connection is made and the Input
and Output Streams
// have been set but before the processConnection()
} // end of init()
/**
* This is an empty implementation of the cleanup().
*
* This method exists so the the concrete class can override it
* if there is something that needs to be done to cleanup resources
before the
* connection is shutdown.
*
* This method is called while the connection is still active and the
Input
* and Output Streams are still available.
*/
protected void cleanup() {
// nothing is done here
// if something needs to be done here in the concrete class
} // end of init()
private void startNIO() throws JBIException {
if (log.isInfoEnabled()) {
log.info("Using NIO to connect to " + this.host + ":" +
this.port);
} // en dof if
if (!this.running) {
this.connectNIO();
try {
this.init();
this.processConnection();
} catch (SocketException e) {
this.running = false;
this.startNIO();
} catch (IOException ioe) {
throw new JBIException(ioe);
} // end of try / catch
} // end of if
} // end of startNIO()
private void connectNIO() throws JBIException {
try {
this.socketChannel = SocketChannel.open();
this.socketChannel.configureBlocking(false);
this.socketChannel.connect(new InetSocketAddress(this.host,
this.port));
int count = 0;
while (!this.socketChannel.finishConnect()) {
log.debug("Finishing connection: " + (count++) + " times");
} // end of while loop
this.running = true;
} catch (ConnectException ce) {
log.warn(this.host + ":" + this.port + " unavailable, waiting "
+ this.retryInterval + " milliseconds before trying again.");
this.running = false;
try {
Thread.sleep(this.retryInterval);
} catch (InterruptedException e) {}
this.connectNIO();
} catch (Exception e) {
log.error("Exception while trying to make a tcp connection to "
+ this.host + ":" + this.port);
log.error(e);
this.running = false;
throw new JBIException("Failed to connect to " + this.host + ":"
+ this.port, e);
} // end of try / catch
} // end of connectNIO()
private void startNonNIO() throws JBIException {
if (log.isInfoEnabled()) {
log.info("Using basic sockets to connect to " + this.host + ":"
+ this.port);
} // en dof if
if (!this.running) {
this.connectNonNIO();
try {
this.init();
this.processConnection();
} catch (SocketException e) {
this.running = false;
this.startNonNIO();
} catch (IOException ioe) {
throw new JBIException(ioe);
} // end of try / catch
} // end of if
} // end of startNonNIO()
private void connectNonNIO() throws JBIException {
try {
this.socket = new Socket(this.host, this.port);
this.in = this.socket.getInputStream();
this.out = this.socket.getOutputStream();
this.running = true;
} catch (ConnectException ce) {
log.warn(this.host + ":" + this.port + " unavailable, waiting "
+ this.retryInterval + " milliseconds before trying again.");
this.running = false;
try {
Thread.sleep(this.retryInterval);
} catch (InterruptedException e) {}
this.connectNonNIO();
} catch (Exception e) {
log.error("Exception while trying to make a tcp connection to "
+ this.host + ":" + this.port);
log.error(e);
this.running = false;
throw new JBIException("Failed to connect to " + this.host + ":"
+ this.port, e);
} // end of try / catch
} // end of connectNonNIO()
private void stopNIO() throws JBIException {
log.debug("Inside stopNIO()");
if (this.socketChannel != null) {
this.cleanup();
try {
this.socketChannel.close();
this.socketChannel = null;
this.running = false;
} catch (IOException e) {
throw new JBIException("Unable to close connection", e);
} // end of try / catch
} // end of if
} // end of stopNIO()
private void stopNonNIO() throws JBIException {
log.debug("Inside stopNonNIO()");
if (this.socket != null) {
if (this.socket.isConnected()) {
this.cleanup();
try {
this.socket.close();
this.in = null;
this.out = null;
this.socket = null;
this.running = false;
} catch (IOException e) {
throw new JBIException("Unable to close connection", e);
} // end of try / catch
} // end of if
} // end of if
} // end of stopNonNIO()
} // end of class TcpComponentSupport
On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
> Sorry, let me try it again with just the .java file.
>
>
> On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:
>
>> Hopefully this time the attachment will go through. I am not exactly sure
>> why it didn't before. It is a zip file.
>>
>> I guess I was thinking that when the component is deployed into the
>> lwcontainer that it would be running in its own thread anyway. Apparently I
>> was incorrect in my assumption.
>>
>> I am still trying to get a grasp on the whole ServiceMix thing so please
>> excuse my ignorance.
>>
>> Thanks,
>> Doug
>>
>>
>> On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>
>>> I think the attachment have been lost.
>>>
>>> Btw, for a tcp consumer that creates a server socket, it has to wait
>>> for incoming connections, so how can you not start a thread ? The
>>> jabber api uses a listener, so it is really different.
>>>
>>> Cheers,
>>> Guillaume Nodet
>>>
>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>> I have attached my TcpComponentSupport class to this email. If you could
>>>> be
>>>> so kind to take a look at it and let me know if it looks ok. You mentioned
>>>> that the start method should start a Thread and then kill that Thread in
>>>> the
>>>> stop method. I am not actually doing this right now however I of course
>>>> could implement it that way if necessary. I attempted to use the
>>>> JabberComponentSupport class as a template.
>>>>
>>>> Please let me know your thoughts.
>>>>
>>>> Thanks,
>>>> Doug
>>>>
>>>>
>>>> On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>
>>>>> Not sure to understand the problem.
>>>>> Usually a lightweight consumer BC will begin to receive messages when
>>>>> the component is started. It means that all the necessary
>>>>> configuration parameters have been set.
>>>>> The tcp component needs to start a thread inside the start method (and
>>>>> kill it inside the stop method).
>>>>>
>>>>> The second problem is related to components / service units lifecycle.
>>>>> When a lightweight component is deployed to the lwcontainer, he has a
>>>>> specific lifecycle: when the service unit is started, the component is
>>>>> started and when the service unit is stopped, the component is
>>>>> shutdown.
>>>>>
>>>>> But the problem should only occur when someone sends something on the
>>>>> socket
>>>>> ...
>>>>>
>>>>> Cheers,
>>>>> Guillaume Nodet
>>>>>
>>>>> On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>> Guillaume,
>>>>>>
>>>>>> Yes you are right, I did ask before (somehow I got sidetracked an forgot
>>>>>> all
>>>>>> about it). Anyway, thank you for the reply though.
>>>>>>
>>>>>> I have been able to implement a TcpComponentSupport class, at least I
>>>>>> think
>>>>>> so anyway. The problem that I am having now is that as soon as I deploy
>>>>>> the
>>>>>> component it starts collecting data and pushing it to the bus. I have
>>>>>> run
>>>>>> into a couple of problems because of this. The first problem is while
>>>>>> testing, I would like to extend from TestSupport and use the
>>>>>> assertMessagesReceived(), however when it loads the ApplicationContext,
>>>>>> the
>>>>>> concrete TcpComponentSupport class simply begins communicating over the
>>>>>> socket and the test method with the assertMessagesReceived() call is
>>>>>> never
>>>>>> executed. The test will just run forever.
>>>>>>
>>>>>> The second question that I have is along the same lines. I would like to
>>>>>> create a service unit, with simply this component in it and deploy it to
>>>>>> ServiceMix and then use something like eip to route the messages where I
>>>>>> want them. The same kind of problem occurs in this scenario, as soon as
>>>>>> the
>>>>>> service unit is deployed, it will start collecting data from the stream
>>>>>> and
>>>>>> trying to push it to the bus, the problem is that there is no destination
>>>>>> assigned to the the component yet so it tries to send it to "service:
>>>>>> null
>>>>>> and interface null".
>>>>>>
>>>>>> Do you have any ideas?
>>>>>>
>>>>>> Thank you,
>>>>>> Doug
>>>>>>
>>>>>>
>>>>>> On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:
>>>>>>
>>>>>>> It seems there's none -- if I recall, you have already asked this
>>>>>>> question
>>>>>>> ;)
>>>>>>> However, it should not be very difficult to implement.
>>>>>>> You have to inherit the
>>>>>>> org.apache.servicemix.components.util.ComponentSupport and implements
>>>>>>> MessageExchangeListener. Then, just create your socket and read it :)
>>>>>>> You may need another lw component for sending over tcp, also.
>>>>>>>
>>>>>>> Cheers,
>>>>>>> Guillaume Nodet
>>>>>>>
>>>>>>> On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
>>>>>>>> Does anyone know if a basic TCP component is available? What I would
>>>>>>>> like
>>>>>>>> to be able to do is simply extend from a TCP component, set the host
>>>>>>>> and
>>>>>>>> port in the servicemix.xml file, and have access to the input and
>>>>>>>> output
>>>>>>>> streams in my implementation class. I would then want to be able to
>>>>>>>> read
>>>>>>>> from the input stream and create a NormalizedMessage to put on the bus
>>>>>>>> and
>>>>>>>> also be able to get a message from the bus and write it to the output
>>>>>>>> stream.
>>>>>>>>
>>>>>>>> Is there anything like this out there?
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Doug
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>>
>>>
>>
>