No worries. This is a small Unit Test that shows the problem. Hope it helps:

--

package netty;

import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import junit.framework.TestCase;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@RunWith(Parameterized.class)
public class NettyTest extends TestCase
{
    private final static Logger logger =
LoggerFactory.getLogger(NettyTest.class);
    private final static CamelContext serverContext = new
DefaultCamelContext();

    private final CamelContext clientContext = new DefaultCamelContext();
    private final AtomicInteger responseCounter = new AtomicInteger(0);
    private final AtomicBoolean passedTen = new AtomicBoolean(false);

    private Boolean disconnectClient;

    public NettyTest(Boolean disconnectClient)
    {
        this.disconnectClient = disconnectClient;
    }

    @Parameters
    public static Collection<Object[]> configs()
    {
        return Arrays.asList(new Object[][] { { true }, { false } });
    }

    @BeforeClass
    public static void createServer() throws Exception
    {
        serverContext.addRoutes(new RouteBuilder()
        {
            @Override
            public void configure() throws Exception
            {
               
from("netty:tcp://localhost:9000?sync=true&disconnectOnNoReply=false&allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false")
                        .setExchangePattern(ExchangePattern.InOut)
                        .process(new Processor() {

                            @Override
                            public void process(Exchange exchange) throws
Exception
                            {
                                Object body = exchange.getIn().getBody();
                                logger.info("Request received : Value = {}",
body);
                            }
                            
                        })
                        .transform(constant(3)).stop();
            }
        });

        serverContext.start();
    }

    @Before
    public void createClient() throws Exception
    {
        clientContext.addRoutes(new RouteBuilder()
        {
            @Override
            public void configure() throws Exception
            {
                // Generate an Echo message and ensure a Response is sent
                from("timer://echoTimer?delay=1s&fixedRate=true&period=1s")
                        .setExchangePattern(ExchangePattern.InOut)
                        .transform()
                        .constant(2)
                        .to(ExchangePattern.InOut,
"netty:tcp://localhost:9000?allowDefaultCodec=true&tcpNoDelay=true&reuseAddress=true&keepAlive=false&sync=true&disconnect="
+ disconnectClient.toString())
                        .process(new Processor()
                        {
                            @Override
                            public void process(Exchange exchange) throws
Exception
                            {
                                Object body = exchange.getIn().getBody();
                                logger.info("Response number {} : Value =
{}",
                                        responseCounter.incrementAndGet(),
body);

                                if (responseCounter.get() > 10) {
                                    passedTen.set(true);
                                }
                            }

                        }).stop();
            }
        });
    }

    @Test
    public void test() throws Exception
    {
        clientContext.getShutdownStrategy().setTimeout(1);

        clientContext.start();

        logger.info("Disconnect = {}", this.disconnectClient);

        Thread.sleep(TimeUnit.SECONDS.toMillis(15));

        clientContext.stop();

        assertTrue("More than 10 responses have been received",
passedTen.get());
    }
}


--
View this message in context: 
http://camel.465427.n5.nabble.com/Camel-Netty-Producer-creating-new-connection-on-every-message-tp4844805p4852442.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Reply via email to