In spagic we've one component to handle low level tcp protocol:

Take a look at it: http://forge.objectweb.org/projects/spagic

Andrea Zoppello

jpuro ha scritto:
Has a standard jbi BC component been created for this yet?  I think not, but
was curious.

-jeff


dfischer wrote:
Forget it, here is the code.  So sorry for so many emails.


package org.apache.servicemix.components.tcp;

import org.apache.log4j.Logger;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.nio.channels.SocketChannel;

import javax.jbi.JBIException;
import javax.jbi.messaging.InOnly;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.NormalizedMessage;
import javax.xml.transform.TransformerException;

import org.apache.servicemix.MessageExchangeListener;
import org.apache.servicemix.components.util.ComponentSupport;
import org.springframework.beans.factory.InitializingBean;

public abstract class TcpComponentSupport extends ComponentSupport
implements
        MessageExchangeListener, InitializingBean {
    /**
     * Logger for this class
     */
    private static final Logger log = Logger
            .getLogger(TcpComponentSupport.class);
private String host;
    private int port = -1;
    private boolean useNIO = false;
    private int retryInterval = 10000;
    protected SocketChannel socketChannel = null;
    private Socket socket = null;
    protected InputStream in = null;
    protected OutputStream out = null;
    protected boolean running = false;
    private TcpMarshaler marshaler = new TcpMarshaler();
protected abstract void processConnection() throws JBIException,
IOException;
    protected abstract void processIncomingMessage(String tcpMessage);
public void setHost(String host) {
        this.host = host;
    } // en dof setHost(String)(
public void setPort(int port) {
        this.port = port;
    } // en dof setPort(int)
public void setUseNIO(boolean useNIO) {
        this.useNIO = useNIO;
    } // end of setUserNIO(boolean)
public void setRetryInterval(int retryInterval) {
        this.retryInterval = retryInterval;
    } // end of setRetryInterval(int)
protected boolean getUseNIO() {
        return this.useNIO;
    } // end of getUseNIO()
protected String getHost() {
        return this.host;
    } // en dof getHost()
protected int getPort() {
        return this.port;
    } // en dof getPort()

    public void onMessageExchange(MessageExchange exchange)
            throws MessagingException {
        String tcpMessage = null;
        try {
            tcpMessage =
this.marshaler.fromNMS(exchange.getMessage("in"));
            this.processIncomingMessage(tcpMessage);
            this.done(exchange);
        } catch (TransformerException e) {
            throw new MessagingException(e);
        } // end of try /catch
    } // end of onMessageExchange(MessageExchange)

    public void afterPropertiesSet() throws Exception {
        if (host == null) {
            throw new IllegalArgumentException("You must specify the host
property.");
        } // end of if
if (this.port < 0) {
            throw new IllegalArgumentException("You must specify the port
property.");
        } // end of if
    } // end of afterPropertiesSet()
public void start() throws JBIException {
        if (this.useNIO) {
            // configure the NIO SocketChannel
            this.startNIO();
        } else {
            // this is the default
            // we will simply open a basic socket and create the Input and
Ooutput Streams
            this.startNonNIO();
        } // end of if / else
    } // end of start()
public void stop() throws JBIException {
        if (useNIO) {
            this.stopNIO();
        } else {
            this.stopNonNIO();
        } // ne do if / else
    } // end of stop()
protected void sendToBus(String message) {
        log.debug("Inside sendToBus(), sending message: " + message);
        try {
            InOnly exchange =
this.getExchangeFactory().createInOnlyExchange();
            NormalizedMessage normalizedMessage =
exchange.createMessage();
            exchange.setInMessage(normalizedMessage);
            this.marshaler.toNMS(normalizedMessage, message);
            this.getDeliveryChannel().sendSync(exchange);
            //done(exchange);
        } catch (MessagingException me) {
            throw new TcpConnectionException(me, message);
        } // end of try / catch
    } // end of sendToBus(String)
/**
     * This is an empty implementation of the init().
     * This method exists so that the concrete class can override it
     * if there is something that needs to be done to initialize the
communication
     * with the server.
* * This method is called after the connection has been made and after the * Input and Output Streams have been assigned. * * This method is called right before the processConnection().
     */
    protected void init() {
        // nothing is done here
        // if something needs to be done here in the concrete class
        // this method is called after the connection is made and the
Input
and Output Streams
        // have been set but before the processConnection()
    } // end of init()
/**
     * This is an empty implementation of the cleanup().
* * This method exists so the the concrete class can override it
     * if there is something that needs to be done to cleanup resources
before the * connection is shutdown. * * This method is called while the connection is still active and the
Input
     * and Output Streams are still available.
     */
    protected void cleanup() {
        // nothing is done here
        // if something needs to be done here in the concrete class
    } // end of init()
private void startNIO() throws JBIException {
        if (log.isInfoEnabled()) {
            log.info("Using NIO to connect to " + this.host + ":" +
this.port);
        } // en dof if

        if (!this.running) {
            this.connectNIO();
            try {
                this.init();
                this.processConnection();
            } catch (SocketException e) {
                this.running = false;
                this.startNIO();
            } catch (IOException ioe) {
                throw new JBIException(ioe);
            } // end of try / catch
        } // end of if
    } // end of startNIO()
private void connectNIO() throws JBIException {
        try {
            this.socketChannel = SocketChannel.open();
            this.socketChannel.configureBlocking(false);
this.socketChannel.connect(new InetSocketAddress(this.host,
this.port));
            int count = 0;
            while (!this.socketChannel.finishConnect()) {
                log.debug("Finishing connection: " + (count++) + "
times");
            } // end of while loop
this.running = true;
        } catch (ConnectException ce) {
            log.warn(this.host + ":" + this.port + " unavailable, waiting
"
+ this.retryInterval + " milliseconds before trying again.");
            this.running = false;
            try {
                Thread.sleep(this.retryInterval);
            } catch (InterruptedException e) {}
            this.connectNIO();
        } catch (Exception e) {
            log.error("Exception while trying to make a tcp connection to
"
+ this.host + ":" + this.port);
            log.error(e);
            this.running = false;
            throw new JBIException("Failed to connect to " + this.host +
":"
+ this.port, e);
        } // end of try / catch
    } // end of connectNIO()
private void startNonNIO() throws JBIException {
        if (log.isInfoEnabled()) {
            log.info("Using basic sockets to connect to " + this.host +
":"
+ this.port);
        } // en dof if
if (!this.running) {
            this.connectNonNIO();
            try {
                this.init();
                this.processConnection();
            } catch (SocketException e) {
                this.running = false;
                this.startNonNIO();
            } catch (IOException ioe) {
                throw new JBIException(ioe);
            } // end of try / catch
        } // end of if
    } // end of startNonNIO()
private void connectNonNIO() throws JBIException {
        try {
            this.socket = new Socket(this.host, this.port);
this.in = this.socket.getInputStream();
            this.out = this.socket.getOutputStream();
this.running = true;
        } catch (ConnectException ce) {
            log.warn(this.host + ":" + this.port + " unavailable, waiting
"
+ this.retryInterval + " milliseconds before trying again.");
            this.running = false;
            try {
                Thread.sleep(this.retryInterval);
            } catch (InterruptedException e) {}
            this.connectNonNIO();
        } catch (Exception e) {
            log.error("Exception while trying to make a tcp connection to
"
+ this.host + ":" + this.port);
            log.error(e);
            this.running = false;
            throw new JBIException("Failed to connect to " + this.host +
":"
+ this.port, e);
        } // end of try / catch
    } // end of connectNonNIO()
private void stopNIO() throws JBIException {
        log.debug("Inside stopNIO()");
        if (this.socketChannel != null) {
            this.cleanup();
            try {
                this.socketChannel.close();
                this.socketChannel = null;
                this.running = false;
            } catch (IOException e) {
                throw new JBIException("Unable to close connection", e);
            } // end of try / catch
        } // end of if
    } // end of stopNIO()
private void stopNonNIO() throws JBIException {
        log.debug("Inside stopNonNIO()");
        if (this.socket != null) {
            if (this.socket.isConnected()) {
                this.cleanup();
                try {
                    this.socket.close();
                    this.in = null;
                    this.out = null;
                    this.socket = null;
                    this.running = false;
                } catch (IOException e) {
                    throw new JBIException("Unable to close connection",
e);
                } // end of try / catch
            } // end of if
        } // end of if
    } // end of stopNonNIO()
} // end of class TcpComponentSupport



On 5/12/06 11:49 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:

Sorry, let me try it again with just the .java file.


On 5/12/06 11:47 AM, "Doug Fischer" <[EMAIL PROTECTED]> wrote:

Hopefully this time the attachment will go through.  I am not exactly
sure
why it didn't before.  It is a zip file.

I guess I was thinking that when the component is deployed into the
lwcontainer that it would be running in its own thread anyway. Apparently I
was incorrect in my assumption.

I am still trying to get a grasp on the whole ServiceMix thing so please
excuse my ignorance.

Thanks,
Doug


On 5/12/06 11:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:

I think the attachment have been lost.

Btw, for a tcp consumer that creates a server socket, it has to wait
for incoming connections, so how can you not start a thread ? The
jabber api uses a listener, so it is really different.

Cheers,
Guillaume Nodet

On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
I have attached my TcpComponentSupport class to this email.  If you
could
be
so kind to take a look at it and let me know if it looks ok.  You
mentioned
that the start method should start a Thread and then kill that Thread
in
the
stop method.  I am not actually doing this right now however I of
course
could implement it that way if necessary.  I attempted to use the
JabberComponentSupport class as a template.

Please let me know your thoughts.

Thanks,
Doug


On 5/12/06 8:06 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:

Not sure to understand the problem.
Usually a lightweight consumer BC will begin to receive messages when
the component is started.  It means that all the necessary
configuration parameters have been set.
The tcp component needs to start a thread inside the start method
(and
kill it inside the stop method).

The second problem is related to components / service units
lifecycle.
When a lightweight component is deployed to the lwcontainer, he has a
specific lifecycle: when the service unit is started, the component
is
started and when the service unit is stopped, the component is
shutdown.

But the problem should only occur when someone sends something on the
socket
...

Cheers,
Guillaume Nodet

On 5/12/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
Guillaume,

Yes you are right, I did ask before (somehow I got sidetracked an
forgot
all
about it).  Anyway, thank you for the reply though.

I have been able to implement a TcpComponentSupport class, at least
I
think
so anyway.  The problem that I am having now is that as soon as I
deploy
the
component it starts collecting data and pushing it to the bus.  I
have
run
into a couple of problems because of this.  The first problem is
while
testing, I would like to extend from TestSupport and use the
assertMessagesReceived(), however when it loads the
ApplicationContext,
the
concrete TcpComponentSupport class simply begins communicating over
the
socket and the test method with the assertMessagesReceived() call is
never
executed.  The test will just run forever.

The second question that I have is along the same lines.  I would
like to
create a service unit, with simply this component in it and deploy
it to
ServiceMix and then use something like eip to route the messages
where I
want them.  The same kind of problem occurs in this scenario, as
soon as
the
service unit is deployed, it will start collecting data from the
stream
and
trying to push it to the bus, the problem is that there is no
destination
assigned to the the component yet so it tries to send it to
"service:
null
and interface null".

Do you have any ideas?

Thank you,
Doug


On 5/11/06 3:19 AM, "Guillaume Nodet" <[EMAIL PROTECTED]> wrote:

It seems there's none -- if I recall, you have already asked this
question
;)
However, it should not be very difficult to implement.
You have to inherit the
org.apache.servicemix.components.util.ComponentSupport and
implements
MessageExchangeListener.  Then, just create your socket and read it
:)
You may need another lw component for sending over tcp, also.

Cheers,
Guillaume Nodet

On 5/10/06, Doug Fischer <[EMAIL PROTECTED]> wrote:
Does anyone know if a basic TCP component is available?  What I
would
like
to be able to do is simply extend from a TCP component, set the
host
and
port in the servicemix.xml file, and have access to the input and
output
streams in my implementation class.  I would then want to be able
to
read
from the input stream and create a NormalizedMessage to put on the
bus
and
also be able to get a message from the bus and write it to the
output
stream.

Is there anything like this out there?

Thanks,
Doug








Reply via email to