This is an automated email from the ASF dual-hosted git repository. lburgazzoli pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push: new 1ad92f0 CAMEL-14570: camel-zookeeper-master - Use testcontainers for testing 1ad92f0 is described below commit 1ad92f06b880995f2266fa40842cb50d8f8b00a0 Author: lburgazzoli <lburgazz...@gmail.com> AuthorDate: Mon Feb 17 12:07:17 2020 +0100 CAMEL-14570: camel-zookeeper-master - Use testcontainers for testing --- components/camel-zookeeper-master/pom.xml | 93 +++++- .../MasterEndpointFailoverTest.java | 16 +- .../zookeepermaster/MasterEndpointTest.java | 36 -- .../zookeepermaster/MasterQuartzEndpointTest.java | 35 -- .../component/zookeepermaster/ZKContainer.java | 65 ++++ .../zookeepermaster/ZKServerFactoryBean.java | 230 ------------- .../component/zookeepermaster/group/GroupTest.java | 370 ++++++++++++--------- .../zookeepermaster/MasterEndpointTest-context.xml | 20 +- .../MasterQuartzEndpointTest-context.xml | 20 +- 9 files changed, 395 insertions(+), 490 deletions(-) diff --git a/components/camel-zookeeper-master/pom.xml b/components/camel-zookeeper-master/pom.xml index 3e35c2d..bf629f9 100644 --- a/components/camel-zookeeper-master/pom.xml +++ b/components/camel-zookeeper-master/pom.xml @@ -115,11 +115,13 @@ <scope>test</scope> </dependency> + <!-- testing --> <dependency> <groupId>org.apache.camel</groupId> - <artifactId>camel-test-spring</artifactId> + <artifactId>camel-testcontainers-spring</artifactId> <scope>test</scope> </dependency> + <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> @@ -138,20 +140,81 @@ </dependencies> - <build> - <plugins> - <plugin> - <artifactId>maven-surefire-plugin</artifactId> - <configuration> - <childDelegation>false</childDelegation> - <useFile>true</useFile> - <forkCount>1</forkCount> - <reuseForks>true</reuseForks> - <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> - </configuration> - </plugin> - </plugins> - </build> + <profiles> + <profile> + <id>zookeeper-master-skip-tests</id> + <activation> + <activeByDefault>true</activeByDefault> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>true</skipTests> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <!-- activate test if the docker socket file is accessible --> + <profile> + <id>zookeeper-master-tests-docker-file</id> + <activation> + <file> + <exists>/var/run/docker.sock</exists> + </file> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>${skipTests}</skipTests> + <systemPropertyVariables> + <visibleassertions.silence>true</visibleassertions.silence> + </systemPropertyVariables> + <childDelegation>false</childDelegation> + <useFile>true</useFile> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + <!-- activate test if the DOCKER_HOST env var is set --> + <profile> + <id>zookeeper-master-tests-docker-env</id> + <activation> + <property> + <name>env.DOCKER_HOST</name> + </property> + </activation> + <build> + <plugins> + <plugin> + <artifactId>maven-surefire-plugin</artifactId> + <configuration> + <skipTests>${skipTests}</skipTests> + <systemPropertyVariables> + <visibleassertions.silence>true</visibleassertions.silence> + </systemPropertyVariables> + <childDelegation>false</childDelegation> + <useFile>true</useFile> + <forkCount>1</forkCount> + <reuseForks>true</reuseForks> + <forkedProcessTimeoutInSeconds>600</forkedProcessTimeoutInSeconds> + </configuration> + </plugin> + </plugins> + </build> + </profile> + + </profiles> </project> diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointFailoverTest.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointFailoverTest.java index 85366fb..0fbefa7 100644 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointFailoverTest.java +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointFailoverTest.java @@ -42,17 +42,15 @@ public class MasterEndpointFailoverTest { protected MockEndpoint result1Endpoint; protected MockEndpoint result2Endpoint; protected AtomicInteger messageCounter = new AtomicInteger(1); - protected ZKServerFactoryBean serverFactoryBean = new ZKServerFactoryBean(); + protected ZKContainer zkContainer = new ZKContainer(); protected CuratorFactoryBean zkClientBean = new CuratorFactoryBean(); @Before public void beforeRun() throws Exception { - System.out.println("Starting ZK server!"); - serverFactoryBean.setPort(9004); - serverFactoryBean.afterPropertiesSet(); + zkContainer.start(); // Create the zkClientBean - zkClientBean.setConnectString("localhost:9004"); + zkClientBean.setConnectString(zkContainer.getConnectionString()); CuratorFramework client = zkClientBean.getObject(); // Need to bind the zookeeper client with the name "curator" @@ -101,21 +99,21 @@ public class MasterEndpointFailoverTest { ServiceHelper.stopService(consumerContext2); ServiceHelper.stopService(producerContext); zkClientBean.destroy(); - serverFactoryBean.destroy(); + zkContainer.stop(); } @Test public void testEndpoint() throws Exception { - System.out.println("Starting consumerContext1"); + LOG.info("Starting consumerContext1"); ServiceHelper.startService(consumerContext1); assertMessageReceived(result1Endpoint, result2Endpoint); - System.out.println("Starting consumerContext2"); + LOG.info("Starting consumerContext2"); ServiceHelper.startService(consumerContext2); assertMessageReceivedLoop(result1Endpoint, result2Endpoint, 3); - System.out.println("Stopping consumerContext1"); + LOG.info("Stopping consumerContext1"); ServiceHelper.stopService(consumerContext1); assertMessageReceivedLoop(result2Endpoint, result1Endpoint, 3); } diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java index e70cfd4..eba3ee5 100644 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterEndpointTest.java @@ -26,10 +26,6 @@ import org.apache.camel.Route; import org.apache.camel.component.file.remote.SftpEndpoint; import org.apache.camel.component.mock.MockEndpoint; import org.apache.camel.impl.DefaultCamelContext; -import org.apache.camel.support.service.ServiceHelper; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; @@ -39,11 +35,6 @@ import static org.junit.Assert.assertEquals; @ContextConfiguration public class MasterEndpointTest extends AbstractJUnit4SpringContextTests { - - protected static ZKServerFactoryBean lastServerBean; - - protected static CuratorFactoryBean lastClientBean; - @Autowired protected CamelContext camelContext; @@ -53,33 +44,6 @@ public class MasterEndpointTest extends AbstractJUnit4SpringContextTests { @Produce("seda:bar") protected ProducerTemplate template; - // Yeah this sucks.. why does the spring context not get shutdown - // after each test case? Not sure! - @Autowired - protected ZKServerFactoryBean zkServerBean; - - @Autowired - protected CuratorFactoryBean zkClientBean; - - @Before - public void startService() throws Exception { - ServiceHelper.startService(camelContext); - ServiceHelper.startService(template); - } - - @After - public void afterRun() throws Exception { - lastServerBean = zkServerBean; - lastClientBean = zkClientBean; - ServiceHelper.stopService(camelContext); - } - - @AfterClass - public static void shutDownZK() throws Exception { - lastClientBean.destroy(); - lastServerBean.destroy(); - } - @Test public void testEndpoint() throws Exception { // check the endpoint configuration diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest.java index a6d7a1f..ee7941d 100644 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest.java +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest.java @@ -19,10 +19,6 @@ package org.apache.camel.component.zookeepermaster; import org.apache.camel.CamelContext; import org.apache.camel.EndpointInject; import org.apache.camel.component.mock.MockEndpoint; -import org.apache.camel.support.service.ServiceHelper; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; import org.junit.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.test.context.ContextConfiguration; @@ -30,43 +26,12 @@ import org.springframework.test.context.junit4.AbstractJUnit4SpringContextTests; @ContextConfiguration public class MasterQuartzEndpointTest extends AbstractJUnit4SpringContextTests { - - protected static ZKServerFactoryBean lastServerBean; - - protected static CuratorFactoryBean lastClientBean; - @Autowired protected CamelContext camelContext; @EndpointInject("mock:results") protected MockEndpoint resultEndpoint; - // Yeah this sucks.. why does the spring context not get shutdown - // after each test case? Not sure! - @Autowired - protected ZKServerFactoryBean zkServerBean; - - @Autowired - protected CuratorFactoryBean zkClientBean; - - @Before - public void startService() throws Exception { - ServiceHelper.startService(camelContext); - } - - @After - public void afterRun() throws Exception { - lastServerBean = zkServerBean; - lastClientBean = zkClientBean; - ServiceHelper.stopService(camelContext); - } - - @AfterClass - public static void shutDownZK() throws Exception { - lastClientBean.destroy(); - lastServerBean.destroy(); - } - @Test public void testEndpoint() throws Exception { resultEndpoint.expectedMinimumMessageCount(2); diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKContainer.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKContainer.java new file mode 100644 index 0000000..7e2aabf --- /dev/null +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKContainer.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.component.zookeepermaster; + +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.output.Slf4jLogConsumer; +import org.testcontainers.containers.wait.strategy.Wait; + +public class ZKContainer extends GenericContainer { + public static final String CONTAINER_IMAGE = "zookeeper:3.5"; + public static final String CONTAINER_NAME = "zookeeper"; + public static final int CLIENT_PORT = 2181; + + public ZKContainer() { + this(CONTAINER_NAME, -1); + } + + public ZKContainer(int clientPort) { + this(CONTAINER_NAME, clientPort); + } + + public ZKContainer(String name) { + this(name, -1); + + setWaitStrategy(Wait.forListeningPort()); + + withNetworkAliases(name); + withExposedPorts(CLIENT_PORT); + withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(ZKContainer.class))); + } + + public ZKContainer(String name, int clientPort) { + super(CONTAINER_IMAGE); + + setWaitStrategy(Wait.forListeningPort()); + + withNetworkAliases(name); + withLogConsumer(new Slf4jLogConsumer(LoggerFactory.getLogger(ZKContainer.class))); + + if (clientPort > 0) { + addFixedExposedPort(clientPort, CLIENT_PORT); + } else { + withExposedPorts(CLIENT_PORT); + } + } + + public String getConnectionString() { + return getContainerIpAddress() + ":" + getMappedPort(CLIENT_PORT); + } +} diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKServerFactoryBean.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKServerFactoryBean.java deleted file mode 100644 index 38f7157..0000000 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/ZKServerFactoryBean.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.component.zookeepermaster; - -import java.io.File; -import java.net.InetSocketAddress; - -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; -import org.springframework.beans.factory.DisposableBean; -import org.springframework.beans.factory.FactoryBean; -import org.springframework.beans.factory.InitializingBean; - -/** - * A simple ZK server for testing ZK related code in unit tests - */ -public class ZKServerFactoryBean implements FactoryBean<ZooKeeperServer>, InitializingBean, DisposableBean { - private ZooKeeperServer zooKeeperServer = new ZooKeeperServer(); - private NIOServerCnxnFactory connectionFactory; - private File dataLogDir; - private File dataDir; - private boolean purge; - private int tickTime = ZooKeeperServer.DEFAULT_TICK_TIME; - - /** - * defaults to -1 if not set explicitly - */ - private int minSessionTimeout = -1; - - /** - * defaults to -1 if not set explicitly - */ - private int maxSessionTimeout = -1; - private InetSocketAddress clientPortAddress; - private int maxClientConnections; - private int port = 2181; - - @Override - public ZooKeeperServer getObject() throws Exception { - return zooKeeperServer; - } - - @Override - public Class<ZooKeeperServer> getObjectType() { - return ZooKeeperServer.class; - } - - @Override - public boolean isSingleton() { - return true; - } - - @Override - public void afterPropertiesSet() throws Exception { - if (purge) { - deleteFilesInDir(getDataLogDir()); - deleteFilesInDir(getDataDir()); - } - FileTxnSnapLog ftxn = new FileTxnSnapLog(getDataLogDir(), getDataDir()); - zooKeeperServer.setTxnLogFactory(ftxn); - zooKeeperServer.setTickTime(getTickTime()); - zooKeeperServer.setMinSessionTimeout(getMinSessionTimeout()); - zooKeeperServer.setMaxSessionTimeout(getMaxSessionTimeout()); - connectionFactory = new NIOServerCnxnFactory(); - connectionFactory.configure(getClientPortAddress(), getMaxClientConnections()); - connectionFactory.startup(zooKeeperServer); - } - - private void deleteFilesInDir(File dir) { - File[] files = dir.listFiles(); - if (files != null) { - for (File file : files) { - if (file.isDirectory()) { - deleteFilesInDir(file); - } else { - file.delete(); - } - } - } - } - - @Override - public void destroy() throws Exception { - shutdown(); - } - - protected void shutdown() { - if (connectionFactory != null) { - connectionFactory.shutdown(); - try { - connectionFactory.join(); - } catch (InterruptedException e) { - // Ignore - } - connectionFactory = null; - } - if (zooKeeperServer != null) { - zooKeeperServer.shutdown(); - zooKeeperServer = null; - } - } - - // Properties - //------------------------------------------------------------------------- - - public ZooKeeperServer getZooKeeperServer() { - return zooKeeperServer; - } - - public NIOServerCnxnFactory getConnectionFactory() { - return connectionFactory; - } - - public File getDataLogDir() { - if (dataLogDir == null) { - dataLogDir = new File(getZKOutputDir(), "log"); - dataLogDir.mkdirs(); - } - return dataLogDir; - } - - public File getDataDir() { - if (dataDir == null) { - dataDir = new File(getZKOutputDir(), "data"); - dataDir.mkdirs(); - } - return dataDir; - } - - public int getTickTime() { - return tickTime; - } - - public int getMinSessionTimeout() { - return minSessionTimeout; - } - - public int getMaxSessionTimeout() { - return maxSessionTimeout; - } - - public int getPort() { - return port; - } - - public InetSocketAddress getClientPortAddress() { - if (clientPortAddress == null) { - clientPortAddress = new InetSocketAddress(port); - } - return clientPortAddress; - } - - public int getMaxClientConnections() { - return maxClientConnections; - } - - public void setPort(int port) { - this.port = port; - } - - public void setClientPortAddress(InetSocketAddress clientPortAddress) { - this.clientPortAddress = clientPortAddress; - } - - public void setConnectionFactory(NIOServerCnxnFactory connectionFactory) { - this.connectionFactory = connectionFactory; - } - - public void setDataDir(File dataDir) { - this.dataDir = dataDir; - } - - public void setDataLogDir(File dataLogDir) { - this.dataLogDir = dataLogDir; - } - - public void setMaxClientConnections(int maxClientConnections) { - this.maxClientConnections = maxClientConnections; - } - - public void setMaxSessionTimeout(int maxSessionTimeout) { - this.maxSessionTimeout = maxSessionTimeout; - } - - public void setMinSessionTimeout(int minSessionTimeout) { - this.minSessionTimeout = minSessionTimeout; - } - - public void setTickTime(int tickTime) { - this.tickTime = tickTime; - } - - public void setZooKeeperServer(ZooKeeperServer zooKeeperServer) { - this.zooKeeperServer = zooKeeperServer; - } - - public boolean isPurge() { - return purge; - } - - public void setPurge(boolean purge) { - this.purge = purge; - } - - // Implementation methods - //------------------------------------------------------------------------- - protected File getZKOutputDir() { - String baseDir = System.getProperty("basedir", "."); - File dir = new File(baseDir + "/target/zk"); - dir.mkdirs(); - return dir; - } - - -} diff --git a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/group/GroupTest.java b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/group/GroupTest.java index 99bc1bb..075a364 100644 --- a/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/group/GroupTest.java +++ b/components/camel-zookeeper-master/src/test/java/org/apache/camel/component/zookeepermaster/group/GroupTest.java @@ -16,30 +16,34 @@ */ package org.apache.camel.component.zookeepermaster.group; -import java.io.File; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.ArrayList; import java.util.Collection; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import org.apache.camel.component.zookeepermaster.ZKContainer; import org.apache.camel.component.zookeepermaster.group.internal.ChildData; import org.apache.camel.component.zookeepermaster.group.internal.ZooKeeperGroup; import org.apache.camel.test.AvailablePortFinder; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; import org.apache.curator.retry.RetryNTimes; -import org.apache.zookeeper.server.NIOServerCnxnFactory; -import org.apache.zookeeper.server.ServerConfig; -import org.apache.zookeeper.server.ZooKeeperServer; -import org.apache.zookeeper.server.persistence.FileTxnSnapLog; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testcontainers.containers.BindMode; +import org.testcontainers.containers.SelinuxContext; +import org.testcontainers.shaded.org.apache.commons.io.FileUtils; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotEquals; import static org.junit.Assert.assertTrue; +import static org.springframework.test.util.AssertionErrors.assertNotEquals; public class GroupTest { + private static final Logger LOGGER = LoggerFactory.getLogger(GroupTest.class); private GroupListener listener = new GroupListener<NodeState>() { @Override @@ -48,27 +52,46 @@ public class GroupTest { boolean master = group.isMaster(); if (connected) { Collection<NodeState> members = group.members().values(); - System.err.println("GroupEvent: " + event + " (connected=" + connected + ", master=" + master + ", members=" + members + ")"); + LOGGER.info("GroupEvent: " + event + " (connected=" + connected + ", master=" + master + ", members=" + members + ")"); } else { - System.err.println("GroupEvent: " + event + " (connected=" + connected + ", master=false)"); + LOGGER.info("GroupEvent: " + event + " (connected=" + connected + ", master=false)"); } } }; - private NIOServerCnxnFactory startZooKeeper(int port) throws Exception { - ServerConfig cfg = new ServerConfig(); - cfg.parse(new String[] {Integer.toString(port), "target/zk/data"}); - - ZooKeeperServer zkServer = new ZooKeeperServer(); - FileTxnSnapLog ftxn = new FileTxnSnapLog(new File(cfg.getDataLogDir()), new File(cfg.getDataDir())); - zkServer.setTxnLogFactory(ftxn); - zkServer.setTickTime(cfg.getTickTime()); - zkServer.setMinSessionTimeout(6000); - zkServer.setMaxSessionTimeout(9000); - NIOServerCnxnFactory cnxnFactory = new NIOServerCnxnFactory(); - cnxnFactory.configure(cfg.getClientPortAddress(), cfg.getMaxClientCnxns()); - cnxnFactory.startup(zkServer); - return cnxnFactory; + private ZKContainer startZooKeeper(int port, Path root) throws Exception { + LOGGER.info("****************************************"); + LOGGER.info("* Starting ZooKeeper container *"); + LOGGER.info("****************************************"); + + ZKContainer container = new ZKContainer(port); + container.withNetworkAliases("zk-" + port); + + if (root != null) { + Path data = root.resolve("data"); + Path datalog = root.resolve("datalog"); + + if (!Files.exists(data)) { + Files.createDirectories(data); + } + if (!Files.exists(datalog)) { + Files.createDirectories(datalog); + } + + LOGGER.debug("data: {}", data); + LOGGER.debug("datalog: {}", datalog); + + container.addFileSystemBind(data.toAbsolutePath().toString(), "/data", BindMode.READ_WRITE, SelinuxContext.SHARED); + container.addFileSystemBind(datalog.toAbsolutePath().toString(), "/datalog", BindMode.READ_WRITE, SelinuxContext.SHARED); + } + + container.start(); + + LOGGER.info("****************************************"); + LOGGER.info("* ZooKeeper container started *"); + LOGGER.info("****************************************"); + + return container; } @@ -96,33 +119,42 @@ public class GroupTest { assertFalse(group.isMaster()); } - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + try { + container = startZooKeeper(port, dataDir); - // first to start should be master if members are ordered... - int i = 0; - for (ZooKeeperGroup group : members) { - group.start(); - group.update(new NodeState("foo" + i)); - i++; + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + + // first to start should be master if members are ordered... + int i = 0; + for (ZooKeeperGroup group : members) { + group.start(); + group.update(new NodeState("foo" + i)); + i++; - // wait for registration - while (group.getId() == null) { - TimeUnit.MILLISECONDS.sleep(100); + // wait for registration + while (group.getId() == null) { + TimeUnit.MILLISECONDS.sleep(100); + } } - } - boolean firsStartedIsMaster = members.get(0).isMaster(); + boolean firsStartedIsMaster = members.get(0).isMaster(); - for (ZooKeeperGroup group : members) { - group.close(); - } - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + for (ZooKeeperGroup group : members) { + group.close(); + } + curator.close(); - assertTrue("first started is master", firsStartedIsMaster); + assertTrue("first started is master", firsStartedIsMaster); + } finally { + if (container != null) { + container.stop(); + } + + FileUtils.deleteDirectory(dataDir.toFile()); + } } @@ -146,21 +178,30 @@ public class GroupTest { GroupCondition groupCondition = new GroupCondition(); group.add(groupCondition); - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + try { + container = startZooKeeper(port, dataDir); - assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); - assertFalse(group.isMaster()); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - group.update(new NodeState("foo")); - assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); + assertFalse(group.isMaster()); + + group.update(new NodeState("foo")); + assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); - group.close(); - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + group.close(); + curator.close(); + } finally { + if (container != null) { + container.stop(); + } + + FileUtils.deleteDirectory(dataDir.toFile()); + } } @Test @@ -184,18 +225,26 @@ public class GroupTest { assertFalse(group.isMaster()); group.update(new NodeState("foo")); - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + try { + container = startZooKeeper(port, dataDir); - assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); - assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); + assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); - group.close(); - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + group.close(); + curator.close(); + } finally { + if (container != null) { + container.stop(); + } + + FileUtils.deleteDirectory(dataDir.toFile()); + } } @Test @@ -208,43 +257,50 @@ public class GroupTest { .build(); curator.start(); - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); - Group<NodeState> group = new ZooKeeperGroup<>(curator, "/singletons/test" + System.currentTimeMillis(), NodeState.class); - group.add(listener); - group.update(new NodeState("foo")); - group.start(); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); - GroupCondition groupCondition = new GroupCondition(); - group.add(groupCondition); + try { + container = startZooKeeper(port, dataDir); + Group<NodeState> group = new ZooKeeperGroup<>(curator, "/singletons/test" + System.currentTimeMillis(), NodeState.class); + group.add(listener); + group.update(new NodeState("foo")); + group.start(); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); - assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + GroupCondition groupCondition = new GroupCondition(); + group.add(groupCondition); - cnxnFactory.shutdown(); - cnxnFactory.join(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); + assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); - groupCondition.waitForDisconnected(5, TimeUnit.SECONDS); - group.remove(groupCondition); + container.stop(); - assertFalse(group.isConnected()); - assertFalse(group.isMaster()); + groupCondition.waitForDisconnected(5, TimeUnit.SECONDS); + group.remove(groupCondition); - groupCondition = new GroupCondition(); - group.add(groupCondition); + assertFalse(group.isConnected()); + assertFalse(group.isMaster()); + + groupCondition = new GroupCondition(); + group.add(groupCondition); - cnxnFactory = startZooKeeper(port); + container = startZooKeeper(port, dataDir); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); - assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); + assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + group.close(); + curator.close(); + } finally { + if (container != null) { + container.stop(); + } - group.close(); - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + FileUtils.deleteDirectory(dataDir.toFile()); + } } //Tests that if close() is executed right after start(), there are no left over entries. @@ -252,95 +308,113 @@ public class GroupTest { @Test public void testGroupClose() throws Exception { int port = AvailablePortFinder.getNextAvailable(); - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); + + try { + container = startZooKeeper(port, dataDir); - CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() + CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() .connectString("localhost:" + port) .connectionTimeoutMs(6000) .sessionTimeoutMs(6000) .retryPolicy(new RetryNTimes(10, 100)); - CuratorFramework curator = builder.build(); - curator.start(); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - String groupNode = "/singletons/test" + System.currentTimeMillis(); - curator.create().creatingParentsIfNeeded().forPath(groupNode); + CuratorFramework curator = builder.build(); + curator.start(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + String groupNode = "/singletons/test" + System.currentTimeMillis(); + curator.create().creatingParentsIfNeeded().forPath(groupNode); + + for (int i = 0; i < 100; i++) { + ZooKeeperGroup<NodeState> group = new ZooKeeperGroup<>(curator, groupNode, NodeState.class); + group.add(listener); + group.update(new NodeState("foo")); + group.start(); + group.close(); + List<String> entries = curator.getChildren().forPath(groupNode); + assertTrue(entries.isEmpty() || group.isUnstable()); + if (group.isUnstable()) { + // let's wait for session timeout + curator.close(); + curator = builder.build(); + curator.start(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + } + } - for (int i = 0; i < 100; i++) { - ZooKeeperGroup<NodeState> group = new ZooKeeperGroup<>(curator, groupNode, NodeState.class); - group.add(listener); - group.update(new NodeState("foo")); - group.start(); - group.close(); - List<String> entries = curator.getChildren().forPath(groupNode); - assertTrue(entries.isEmpty() || group.isUnstable()); - if (group.isUnstable()) { - // let's wait for session timeout - curator.close(); - curator = builder.build(); - curator.start(); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + curator.close(); + } finally { + if (container != null) { + container.stop(); } - } - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + FileUtils.deleteDirectory(dataDir.toFile()); + } } @Test public void testAddFieldIgnoredOnParse() throws Exception { int port = AvailablePortFinder.getNextAvailable(); - NIOServerCnxnFactory cnxnFactory = startZooKeeper(port); + ZKContainer container = null; + Path dataDir = Files.createTempDirectory("zk-"); - CuratorFramework curator = CuratorFrameworkFactory.builder() - .connectString("localhost:" + port) - .retryPolicy(new RetryNTimes(10, 100)) - .build(); - curator.start(); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - String groupNode = "/singletons/test" + System.currentTimeMillis(); - curator.create().creatingParentsIfNeeded().forPath(groupNode); + try { + container = startZooKeeper(port, dataDir); - curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + CuratorFramework curator = CuratorFrameworkFactory.builder() + .connectString("localhost:" + port) + .retryPolicy(new RetryNTimes(10, 100)) + .build(); + curator.start(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); + String groupNode = "/singletons/test" + System.currentTimeMillis(); + curator.create().creatingParentsIfNeeded().forPath(groupNode); - final ZooKeeperGroup<NodeState> group = new ZooKeeperGroup<>(curator, groupNode, NodeState.class); - group.add(listener); - group.start(); + curator.getZookeeperClient().blockUntilConnectedOrTimedOut(); - GroupCondition groupCondition = new GroupCondition(); - group.add(groupCondition); + final ZooKeeperGroup<NodeState> group = new ZooKeeperGroup<>(curator, groupNode, NodeState.class); + group.add(listener); + group.start(); - group.update(new NodeState("foo")); + GroupCondition groupCondition = new GroupCondition(); + group.add(groupCondition); - assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); - assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); + group.update(new NodeState("foo")); - ChildData currentData = group.getCurrentData().get(0); - final int version = currentData.getStat().getVersion(); + assertTrue(groupCondition.waitForConnected(5, TimeUnit.SECONDS)); + assertTrue(groupCondition.waitForMaster(5, TimeUnit.SECONDS)); - NodeState lastState = group.getLastState(); - String json = lastState.toString(); - System.err.println("JSON:" + json); + ChildData currentData = group.getCurrentData().get(0); + final int version = currentData.getStat().getVersion(); - String newValWithNewField = json.substring(0, json.lastIndexOf('}')) + ",\"Rubbish\":\"Rubbish\"}"; - curator.getZookeeperClient().getZooKeeper().setData(group.getId(), newValWithNewField.getBytes(), version); + NodeState lastState = group.getLastState(); + String json = lastState.toString(); + LOGGER.info("JSON:" + json); - assertTrue(group.isMaster()); + String newValWithNewField = json.substring(0, json.lastIndexOf('}')) + ",\"Rubbish\":\"Rubbish\"}"; + curator.getZookeeperClient().getZooKeeper().setData(group.getId(), newValWithNewField.getBytes(), version); - int attempts = 0; - while (attempts++ < 5 && version == group.getCurrentData().get(0).getStat().getVersion()) { - TimeUnit.SECONDS.sleep(1); - } + assertTrue(group.isMaster()); + + int attempts = 0; + while (attempts++ < 5 && version == group.getCurrentData().get(0).getStat().getVersion()) { + TimeUnit.SECONDS.sleep(1); + } - assertNotEquals("We see the updated version", version, group.getCurrentData().get(0).getStat().getVersion()); + assertNotEquals("We see the updated version", version, group.getCurrentData().get(0).getStat().getVersion()); - System.err.println("CurrentData:" + group.getCurrentData()); + LOGGER.info("CurrentData:" + group.getCurrentData()); - group.close(); - curator.close(); - cnxnFactory.shutdown(); - cnxnFactory.join(); + group.close(); + curator.close(); + } finally { + if (container != null) { + container.stop(); + } + + FileUtils.deleteDirectory(dataDir.toFile()); + } } private class GroupCondition implements GroupListener<NodeState> { diff --git a/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterEndpointTest-context.xml b/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterEndpointTest-context.xml index 6d10500..1974d25 100644 --- a/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterEndpointTest-context.xml +++ b/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterEndpointTest-context.xml @@ -22,24 +22,28 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- Since this is a test case lets run a local ZK server --> - <bean id="zkServer" class="org.apache.camel.component.zookeepermaster.ZKServerFactoryBean"> - <property name="port" value="9003"/> + <bean id="zkContainer" class="org.apache.camel.component.zookeepermaster.ZKContainer" init-method="start" destroy-method="start"> + <constructor-arg> + <value>zookeeper</value> + </constructor-arg> </bean> - <bean id="curator" class="org.apache.camel.component.zookeepermaster.CuratorFactoryBean" depends-on="zkServer"> + <bean id="zkConnectString" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> + <property name="targetObject" ref="zkContainer"/> + <property name="targetMethod" value="getConnectionString"> + </property> + </bean> + + <bean id="curator" class="org.apache.camel.component.zookeepermaster.CuratorFactoryBean" depends-on="zkContainer"> <property name="timeout" value="3000"/> - <property name="connectString" value="localhost:9003"/> + <property name="connectString" ref="zkConnectString"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring" depends-on="curator"> - <route> <from uri="zookeeper-master:master-000:seda:bar"/> <to uri="mock:results"/> </route> </camelContext> - - <!-- some other stuff here... --> - </beans> diff --git a/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest-context.xml b/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest-context.xml index ec6358e..0ffe65f 100644 --- a/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest-context.xml +++ b/components/camel-zookeeper-master/src/test/resources/org/apache/camel/component/zookeepermaster/MasterQuartzEndpointTest-context.xml @@ -22,26 +22,28 @@ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> <!-- Since this is a test case lets run a local ZK server --> - <bean id="zkServer" class="org.apache.camel.component.zookeepermaster.ZKServerFactoryBean"> - <property name="port" value="9003"/> + <bean id="zkContainer" class="org.apache.camel.component.zookeepermaster.ZKContainer" init-method="start" destroy-method="start"> + <constructor-arg> + <value>zookeeper</value> + </constructor-arg> </bean> + <bean id="zkConnectString" class="org.springframework.beans.factory.config.MethodInvokingFactoryBean"> + <property name="targetObject" ref="zkContainer"/> + <property name="targetMethod" value="getConnectionString"> + </property> + </bean> - <bean id="curator" class="org.apache.camel.component.zookeepermaster.CuratorFactoryBean" depends-on="zkServer"> + <bean id="curator" class="org.apache.camel.component.zookeepermaster.CuratorFactoryBean" depends-on="zkContainer"> <property name="timeout" value="3000"/> - <property name="connectString" value="localhost:9003"/> + <property name="connectString" ref="zkConnectString"/> </bean> <camelContext xmlns="http://camel.apache.org/schema/spring" depends-on="curator"> - <route> <from uri="zookeeper-master:master-000:quartz://masterTest?cron=0/2+0/1+*+1/1+*+?+*"/> - <to uri="mock:results"/> </route> </camelContext> - - <!-- some other stuff here... --> - </beans>