Hi

Do you mind creating a JIRA ticket about this issue, so we wont forget?
I have briefly looked into it, and I can also see a new port number is
assigned.

To fix this could potential be not as trivial to support asynchronous
routing engine, for replies coming back.

The Netty Pipeline may have to be stateless and an alternative way to
re-attach Exchange for the async replies would be needed.
Just my initial thoughts for looking at this briefly.


On Thu, Sep 29, 2011 at 11:42 AM, maccamlc
<matthew.mcma...@ultra-avalon.com> wrote:
> No worries. This is a small Unit Test that shows the problem. Hope it helps:
>
> --
>
> package netty;
>
> import java.util.Arrays;
> import java.util.Collection;
> import java.util.concurrent.TimeUnit;
> import java.util.concurrent.atomic.AtomicBoolean;
> import java.util.concurrent.atomic.AtomicInteger;
>
> import junit.framework.TestCase;
>
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.ExchangePattern;
> import org.apache.camel.Processor;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.junit.Before;
> import org.junit.BeforeClass;
> import org.junit.Test;
> import org.junit.runner.RunWith;
> import org.junit.runners.Parameterized;
> import org.junit.runners.Parameterized.Parameters;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> @RunWith(Parameterized.class)
> public class NettyTest extends TestCase
> {
>    private final static Logger logger =
> LoggerFactory.getLogger(NettyTest.class);
>    private final static CamelContext serverContext = new
> DefaultCamelContext();
>
>    private final CamelContext clientContext = new DefaultCamelContext();
>    private final AtomicInteger responseCounter = new AtomicInteger(0);
>    private final AtomicBoolean passedTen = new AtomicBoolean(false);
>
>    private Boolean disconnectClient;
>
>    public NettyTest(Boolean disconnectClient)
>    {
>        this.disconnectClient = disconnectClient;
>    }
>
>    @Parameters
>    public static Collection&lt;Object[]&gt; configs()
>    {
>        return Arrays.asList(new Object[][] { { true }, { false } });
>    }
>
>    @BeforeClass
>    public static void createServer() throws Exception
>    {
>        serverContext.addRoutes(new RouteBuilder()
>        {
>            @Override
>            public void configure() throws Exception
>            {
>
> from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
>                        .setExchangePattern(ExchangePattern.InOut)
>                        .process(new Processor() {
>
>                            @Override
>                            public void process(Exchange exchange) throws
> Exception
>                            {
>                                Object body = exchange.getIn().getBody();
>                                logger.info("Request received : Value = {}",
> body);
>                            }
>
>                        })
>                        .transform(constant(3)).stop();
>            }
>        });
>
>        serverContext.start();
>    }
>
>    @Before
>    public void createClient() throws Exception
>    {
>        clientContext.addRoutes(new RouteBuilder()
>        {
>            @Override
>            public void configure() throws Exception
>            {
>                // Generate an Echo message and ensure a Response is sent
>                from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
>                        .setExchangePattern(ExchangePattern.InOut)
>                        .transform()
>                        .constant(2)
>                        .to(ExchangePattern.InOut,
> "netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect="
> + disconnectClient.toString())
>                        .process(new Processor()
>                        {
>                            @Override
>                            public void process(Exchange exchange) throws
> Exception
>                            {
>                                Object body = exchange.getIn().getBody();
>                                logger.info("Response number {} : Value =
> {}",
>                                        responseCounter.incrementAndGet(),
> body);
>
>                                if (responseCounter.get() > 10) {
>                                    passedTen.set(true);
>                                }
>                            }
>
>                        }).stop();
>            }
>        });
>    }
>
>    @Test
>    public void test() throws Exception
>    {
>        clientContext.getShutdownStrategy().setTimeout(1);
>
>        clientContext.start();
>
>        logger.info("Disconnect = {}", this.disconnectClient);
>
>        Thread.sleep(TimeUnit.SECONDS.toMillis(15));
>
>        clientContext.stop();
>
>        assertTrue("More than 10 responses have been received",
> passedTen.get());
>    }
> }
>
>
> --
> View this message in context: 
> http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-tp4844805p4852442.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cib...@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Reply via email to