This is an automated email from the ASF dual-hosted git repository. nnag pushed a commit to branch feature/restructuring in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git
commit 3e12e9ed322c5d5dfb9baa55533eb738f671cdc3 Author: Naburun Nag <n...@cs.wisc.edu> AuthorDate: Tue Feb 18 18:51:31 2020 -0800 adding the sink tests --- ConfigDiskDir_locator1/BACKUPcluster_config.if | Bin 0 -> 1048576 bytes ConfigDiskDir_locator1/BACKUPcluster_config_1.crf | Bin 0 -> 200 bytes ConfigDiskDir_locator1/BACKUPcluster_config_1.drf | Bin 0 -> 34 bytes ConfigDiskDir_locator1/BACKUPcluster_config_1.krf | Bin 0 -> 68 bytes ConfigDiskDir_locator1/BACKUPcluster_config_3.crf | Bin 0 -> 41 bytes ConfigDiskDir_locator1/BACKUPcluster_config_3.drf | Bin 0 -> 36 bytes build.gradle | 5 +- dunit/locator/locator63530view.dat | Bin 0 -> 198 bytes locator10334view.dat | Bin 0 -> 299 bytes .../java/org/geode/kafka/GeodeAsSinkDUnitTest.java | 165 ++++++++++++++++++++ .../org/geode/kafka/GeodeAsSourceDUnitTest.java | 171 +++++++++++++++++++++ .../geode/kafka/GeodeKafkaConnectorTestBase.java | 171 --------------------- .../org/geode/kafka/GeodeKafkaTestCluster.java | 1 - .../java/org/geode/kafka/GeodeKafkaTestUtils.java | 132 +++++++--------- src/test/java/org/geode/kafka/JavaProcess.java | 12 +- .../org/geode/kafka/LocatorLauncherWrapper.java | 2 +- .../org/geode/kafka/ServerLauncherWrapper.java | 4 +- .../org/geode/kafka/WorkerAndHerderCluster.java | 7 + .../org/geode/kafka/WorkerAndHerderWrapper.java | 31 +++- .../org/geode/kafka/ZooKeeperLocalCluster.java | 1 + 20 files changed, 443 insertions(+), 259 deletions(-) diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config.if b/ConfigDiskDir_locator1/BACKUPcluster_config.if new file mode 100644 index 0000000..8282543 Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config.if differ diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf b/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf new file mode 100644 index 0000000..6dd9f70 Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf differ diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf b/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf new file mode 100644 index 0000000..f2b08ee Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf differ diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf b/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf new file mode 100644 index 0000000..9151588 Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf differ diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf b/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf new file mode 100644 index 0000000..319260f Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf differ diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf b/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf new file mode 100644 index 0000000..0d6be92 Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf differ diff --git a/build.gradle b/build.gradle index 33f9b9c..add42c7 100644 --- a/build.gradle +++ b/build.gradle @@ -73,10 +73,11 @@ dependencies { testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: '2.3.1') testCompile(group: 'junit', name: 'junit', version: '4.12') +// testCompile("org.junit.jupiter:junit-jupiter-params:5.4.2") testCompile('org.mockito:mockito-core:3.2.4') testCompile('pl.pragmatists:JUnitParams:1.1.1') - testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: '1.11.0') - testImplementation 'org.awaitility:awaitility:4.0.2' + testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: '1.9.0') + testImplementation 'org.awaitility:awaitility:3.1.6' } shadowJar { diff --git a/dunit/locator/locator63530view.dat b/dunit/locator/locator63530view.dat new file mode 100644 index 0000000..10a68c2 Binary files /dev/null and b/dunit/locator/locator63530view.dat differ diff --git a/locator10334view.dat b/locator10334view.dat new file mode 100644 index 0000000..93a5f6e Binary files /dev/null and b/locator10334view.dat differ diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java new file mode 100644 index 0000000..5c94575 --- /dev/null +++ b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java @@ -0,0 +1,165 @@ +package org.geode.kafka; + +import static org.awaitility.Awaitility.await; +import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; +import static org.geode.kafka.GeodeKafkaTestUtils.createProducer; +import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig; +import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties; +import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; +import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; +import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; +import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed; +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.concurrent.TimeUnit; + +import com.fasterxml.jackson.module.scala.ser.SymbolSerializerModule; +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.utils.Time; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +//@RunWith(Parameterized.class) +public class GeodeAsSinkDUnitTest { + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + + private static MemberVM locator, server; + private static ClientVM client; + + @Rule + public TestName testName = new TestName(); + + @ClassRule + public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); + + @Rule + public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); + + @BeforeClass + public static void setup() + throws Exception { + startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper)); + } + + @AfterClass + public static void cleanUp() { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", + false, + 200000, + 15000, + 10, + Time.SYSTEM, + "myGroup", + "myMetricType", + null); + + zkClient.close(); + } + + @Parameterized.Parameters(name = "tasks_{0}_partitions_{1}") + public static Iterable<Object[]> getParams() { + return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}}); + } + + private /*final*/ int numTask = 1; + private /*final*/ int numPartition = 1; + +// public GeodeAsSinkDUnitTest(int numTask, int numPartition) { +// this.numTask = numTask; +// this.numPartition = numPartition; +// } + + @Test + public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() throws Exception { + + locator = clusterStartupRule.startLocatorVM(0, 10334); + int locatorPort = locator.getPort(); + server = clusterStartupRule.startServerVM(1, locatorPort); + client = + clusterStartupRule + .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); + int NUM_EVENT = 10; + + // Set unique names for all the different components + String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", ""); + String sourceRegion = "SOURCE_REGION_" + testIdentifier; + String sinkRegion = "SINK_REGION_" + testIdentifier; + String sinkTopic = "SINK_TOPIC_" + testIdentifier; + String sourceTopic = "SOURCE_TOPIC_" + testIdentifier; + + /** + * Start the Apache Geode cluster and create the source and sink regions. + * Create a Apache Geode client which inserts data into the source + */ + server.invoke(() -> { + ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION) + .create(sourceRegion); + ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION) + .create(sinkRegion); + }); + + + /** + * Start the Kafka Cluster, workers and the topic to which the Apache Geode will connect as + * a source + */ + WorkerAndHerderCluster workerAndHerderCluster = null; + KafkaLocalCluster kafkaLocalCluster = null; + try { + kafkaLocalCluster = startKafka( + getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath())); + createTopic(sinkTopic, numPartition, 1); + // Create workers and herder cluster + workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion, + sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(), + "localhost[" + locatorPort + "]"); + client.invoke(() -> { + ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) + .create(sinkRegion); + }); + // Create the producer + Producer<String, String> producer = createProducer(); + + for (int i = 0; i < NUM_EVENT; i++) { + producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i)); + } + + client.invoke(() -> { + Region region = ClusterStartupRule.getClientCache().getRegion(sinkRegion); + await().atMost(10, TimeUnit.SECONDS) + .untilAsserted(() -> assertEquals(10, region.sizeOnServer())); + }); + + + } finally { + // Clean up by deleting the sink topic + deleteTopic(sinkTopic); + if (workerAndHerderCluster != null) { + workerAndHerderCluster.stop(); + } + kafkaLocalCluster.stop(); + } + + } +} diff --git a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java b/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java new file mode 100644 index 0000000..d300b55 --- /dev/null +++ b/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java @@ -0,0 +1,171 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more contributor license + * agreements. See the NOTICE file distributed with this work for additional information regarding + * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. You may obtain a + * copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License + * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express + * or implied. See the License for the specific language governing permissions and limitations under + * the License. + */ +package org.geode.kafka; + +import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; +import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; +import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig; +import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties; +import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; +import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; +import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; +import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed; + +import java.util.Arrays; + +import kafka.zk.KafkaZkClient; +import org.apache.kafka.clients.consumer.Consumer; +import org.apache.kafka.common.utils.Time; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.rules.TestName; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; + +import org.apache.geode.cache.Region; +import org.apache.geode.cache.RegionShortcut; +import org.apache.geode.cache.client.ClientRegionShortcut; +import org.apache.geode.test.dunit.rules.ClientVM; +import org.apache.geode.test.dunit.rules.ClusterStartupRule; +import org.apache.geode.test.dunit.rules.MemberVM; + +@RunWith(Parameterized.class) +public class GeodeAsSourceDUnitTest { + @Rule + public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3); + + private static MemberVM locator, server; + private static ClientVM client; + + + @Rule + public TestName testName = new TestName(); + + @ClassRule + public static TemporaryFolder temporaryFolderForZooKeeper = new TemporaryFolder(); + + @Rule + public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder(); + + @BeforeClass + public static void setup() + throws Exception { + startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper)); + } + + @AfterClass + public static void cleanUp() { + KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", + false, + 200000, + 15000, + 10, + Time.SYSTEM, + "myGroup", + "myMetricType", + null); + + zkClient.close(); + } + + @Parameters(name = "tasks_{0}_partitions_{1}") + public static Iterable<Object[]> getParams() { + return Arrays.asList(new Object[][] {{1, 1}, {1, 2}, {5, 2}}); + } + + private final int numTask; + private final int numPartition; + + public GeodeAsSourceDUnitTest(int numTask, int numPartition) { + this.numTask = numTask; + this.numPartition = numPartition; + } + + @Test + public void whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws Exception { + locator = clusterStartupRule.startLocatorVM(0, 10334); + int locatorPort = locator.getPort(); + server = clusterStartupRule.startServerVM(1, locatorPort); + client = + clusterStartupRule + .startClientVM(2, client -> client.withLocatorConnection(locatorPort)); + int NUM_EVENT = 10; + + // Set unique names for all the different components + String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", ""); + String sourceRegion = "SOURCE_REGION_" + testIdentifier; + String sinkRegion = "SINK_REGION_" + testIdentifier; + String sinkTopic = "SINK_TOPIC_" + testIdentifier; + String sourceTopic = "SOURCE_TOPIC_" + testIdentifier; + + /** + * Start the Apache Geode cluster and create the source and sink regions. + * Create a Apache Geode client which inserts data into the source + */ + server.invoke(() -> { + ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION) + .create(sourceRegion); + ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION) + .create(sinkRegion); + }); + client.invoke(() -> { + ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY) + .create(sourceRegion); + }); + + /** + * Start the Kafka Cluster, workers and the topic to which the Apache Geode will connect as + * a source + */ + WorkerAndHerderCluster workerAndHerderCluster = null; + KafkaLocalCluster kafkaLocalCluster = null; + try { + kafkaLocalCluster = startKafka( + getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath())); + createTopic(sourceTopic, numPartition, 1); + // Create workers and herder cluster + workerAndHerderCluster = startWorkerAndHerderCluster(numTask, sourceRegion, sinkRegion, + sourceTopic, sinkTopic, temporaryFolderForOffset.getRoot().getAbsolutePath(), + "localhost[" + locatorPort + "]"); + + // Create the consumer to consume from the source topic + Consumer<String, String> consumer = createConsumer(sourceTopic); + + // Insert data into the Apache Geode source from the client + client.invoke(() -> { + Region region = ClusterStartupRule.getClientCache().getRegion(sourceRegion); + for (int i = 0; i < NUM_EVENT; i++) { + region.put("KEY" + i, "VALUE" + i); + } + }); + + // Assert that all the data inserted in Apache Geode source is received by the consumer + verifyEventsAreConsumed(consumer, NUM_EVENT); + } finally { + // Clean up by deleting the source topic + deleteTopic(sourceTopic); + if (workerAndHerderCluster != null) { + workerAndHerderCluster.stop(); + } + kafkaLocalCluster.stop(); + } + } +} diff --git a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java deleted file mode 100644 index 43723a9..0000000 --- a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java +++ /dev/null @@ -1,171 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more contributor license - * agreements. See the NOTICE file distributed with this work for additional information regarding - * copyright ownership. The ASF licenses this file to You under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance with the License. You may obtain a - * copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software distributed under the License - * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express - * or implied. See the License for the specific language governing permissions and limitations under - * the License. - */ -package org.geode.kafka; - -import static org.awaitility.Awaitility.await; -import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer; -import static org.geode.kafka.GeodeKafkaTestUtils.createTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic; -import static org.geode.kafka.GeodeKafkaTestUtils.putDataIntoGeodeCluster; -import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeClientAndRegion; -import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeLocator; -import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeServerAndCreateSourceRegion; -import static org.geode.kafka.GeodeKafkaTestUtils.startKafka; -import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster; -import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper; - -import java.io.IOException; -import java.time.Duration; -import java.util.Properties; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import kafka.zk.KafkaZkClient; -import org.apache.kafka.clients.consumer.Consumer; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.common.utils.Time; -import org.junit.After; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; -import org.junit.rules.TestName; - -import org.apache.geode.test.dunit.Host; -import org.apache.geode.test.dunit.VM; -import org.apache.geode.test.dunit.rules.ClusterStartupRule; -import org.apache.geode.test.version.VersionManager; - -public class GeodeKafkaConnectorTestBase { - @Rule - public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(); - - @Rule - public TestName testName = new TestName(); - - @ClassRule - public static TemporaryFolder temporaryFolder = new TemporaryFolder(); - private static boolean debug = true; - - public static String TEST_REGION_TO_TOPIC_BINDINGS = "[someRegionForSource:someTopicForSource]"; - public static String TEST_TOPIC_TO_REGION_BINDINGS = "[someTopicForSink:someRegionForSink]"; - - public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource"; - public static String TEST_REGION_FOR_SOURCE = "someRegionForSource"; - public static String TEST_TOPIC_FOR_SINK = "someTopicForSink"; - public static String TEST_REGION_FOR_SINK = "someRegionForSink"; - - private static ZooKeeperLocalCluster zooKeeperLocalCluster; - private static KafkaLocalCluster kafkaLocalCluster; - private static GeodeLocalCluster geodeLocalCluster; - private static WorkerAndHerderCluster workerAndHerderCluster; - private static Consumer<String, String> consumer; - - private static Properties getZooKeeperProperties() throws IOException { - Properties properties = new Properties(); - properties.setProperty("dataDir", - (debug) ? "/tmp/zookeeper" : temporaryFolder.newFolder("zookeeper").getAbsolutePath()); - properties.setProperty("clientPort", "2181"); - properties.setProperty("tickTime", "2000"); - return properties; - } - - private static Properties getKafkaConfig() throws IOException { - int BROKER_PORT = 9092; - Properties props = new Properties(); - - props.put("broker.id", "0"); - props.put("zookeeper.connect", "localhost:2181"); - props.put("host.name", "localHost"); - props.put("port", BROKER_PORT); - props.put("offsets.topic.replication.factor", "1"); - - // Specifically GeodeKafka connector configs - return props; - } - - - // @Before - // public void setup() - // throws IOException, QuorumPeerConfig.ConfigException, InterruptedException { - // - // System.out.println("NABA : kafka started"); - // } - - @After - public void cleanUp() { - KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181", - false, - 200000, - 15000, - 10, - Time.SYSTEM, - "myGroup", - "myMetricType", - null); - - zkClient.close(); - kafkaLocalCluster.stop(); - } - - @Test - public void doNothing() { - - } - - @Test - public void endToEndSourceTest2() throws Exception { - startZooKeeper(getZooKeeperProperties()); - startKafka(getKafkaConfig()); - Host host = Host.getHost(0); - VM server = host.getVM(VersionManager.CURRENT_VERSION, 0); - VM locator = host.getVM(VersionManager.CURRENT_VERSION, 1); - VM client = host.getVM(VersionManager.CURRENT_VERSION, 2); - WorkerAndHerderCluster workerAndHerderCluster = null; - // Start the Apache Geode locator and server and create the source region - startGeodeLocator(locator); - // Topic Name and all region names must be the same - String topicName = testName.getMethodName(); - startGeodeServerAndCreateSourceRegion(server, topicName); - startGeodeClientAndRegion(client, topicName); - - try { - createTopic(topicName, 1, 1); - // Create workers and herder cluster - workerAndHerderCluster = startWorkerAndHerderCluster(1); - - consumer = createConsumer(topicName); - - // Insert data into the Apache Geode source - putDataIntoGeodeCluster(client, topicName, 10); - // Assert that all the data inserted in Apache Geode source is received by the consumer - AtomicInteger valueReceived = new AtomicInteger(0); - await().atMost(10, TimeUnit.SECONDS).until(() -> { - ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); - for (ConsumerRecord<String, String> record : records) { - valueReceived.incrementAndGet(); - } - return valueReceived.get() == 10; - }); - } finally { - deleteTopic(topicName); - if (workerAndHerderCluster != null) { - workerAndHerderCluster.stop(); - } - } - } - -} diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java index f620caa..57d576d 100644 --- a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java +++ b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java @@ -176,7 +176,6 @@ public class GeodeKafkaTestCluster { props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); - // Create the consumer using props. final Consumer<String, String> consumer = new KafkaConsumer<>(props); diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java index a979e3b..27f9391 100644 --- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java +++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java @@ -14,15 +14,22 @@ */ package org.geode.kafka; +import static org.awaitility.Awaitility.await; + import java.io.IOException; +import java.time.Duration; import java.util.Collections; import java.util.Properties; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import kafka.admin.RackAwareMode; import kafka.zk.AdminZkClient; import kafka.zk.KafkaZkClient; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; @@ -32,18 +39,7 @@ import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.common.utils.Time; import org.apache.kafka.connect.runtime.WorkerConfig; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; - -import org.apache.geode.cache.Cache; -import org.apache.geode.cache.CacheFactory; -import org.apache.geode.cache.Region; -import org.apache.geode.cache.RegionShortcut; -import org.apache.geode.cache.client.ClientCache; -import org.apache.geode.cache.client.ClientCacheFactory; -import org.apache.geode.cache.client.ClientRegionShortcut; -import org.apache.geode.cache.server.CacheServer; -import org.apache.geode.distributed.ConfigurationProperties; -import org.apache.geode.distributed.Locator; -import org.apache.geode.test.dunit.VM; +import org.junit.rules.TemporaryFolder; public class GeodeKafkaTestUtils { protected static ZooKeeperLocalCluster startZooKeeper(Properties zookeeperProperties) @@ -54,7 +50,7 @@ public class GeodeKafkaTestUtils { } protected static KafkaLocalCluster startKafka(Properties kafkaProperties) - throws IOException, InterruptedException, QuorumPeerConfig.ConfigException { + throws IOException, InterruptedException { KafkaLocalCluster kafkaLocalCluster = new KafkaLocalCluster(kafkaProperties); kafkaLocalCluster.start(); return kafkaLocalCluster; @@ -78,6 +74,41 @@ public class GeodeKafkaTestUtils { adminZkClient.deleteTopic(topicName); } + protected static Producer<String, String> createProducer() { + final Properties props = new Properties(); + props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + StringSerializer.class.getName()); + + // Create the producer using props. + final Producer<String, String> producer = + new KafkaProducer<>(props); + return producer; + } + + protected static Properties getZooKeeperProperties(TemporaryFolder temporaryFolder) + throws IOException { + Properties properties = new Properties(); + properties.setProperty("dataDir", temporaryFolder.newFolder("zookeeper").getAbsolutePath()); + properties.setProperty("clientPort", "2181"); + properties.setProperty("tickTime", "2000"); + return properties; + } + + protected static Properties getKafkaConfig(String logPath) { + int BROKER_PORT = 9092; + Properties props = new Properties(); + props.put("broker.id", "0"); + props.put("zookeeper.connect", "localhost:2181"); + props.put("host.name", "localHost"); + props.put("port", BROKER_PORT); + props.put("offsets.topic.replication.factor", "1"); + props.put("log.dirs", logPath); + return props; + } + // consumer props, less important, just for testing? public static Consumer<String, String> createConsumer(String testTopicForSource) { final Properties props = new Properties(); @@ -98,50 +129,13 @@ public class GeodeKafkaTestUtils { return consumer; } - // consumer props, less important, just for testing? - public static Producer<String, String> createProducer() { - final Properties props = new Properties(); - props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, - StringSerializer.class.getName()); - - // Create the producer using props. - final Producer<String, String> producer = - new KafkaProducer<>(props); - return producer; - - } - - public static void startGeodeLocator(VM locatorVM) { - locatorVM.invoke(() -> { - Properties properties = new Properties(); - properties.setProperty(ConfigurationProperties.NAME, "locator1"); - Locator.startLocatorAndDS(10334, - null, properties); - }); - } - - public static void startGeodeServerAndCreateSourceRegion(VM serverVM, String regionName) { - serverVM.invoke(() -> { - Properties properties = new Properties(); - Cache cache = new CacheFactory(properties) - .set(ConfigurationProperties.LOCATORS, "localhost[10334]") - .set(ConfigurationProperties.NAME, "server-1") - .create(); - CacheServer cacheServer = cache.addCacheServer(); - cacheServer.setPort(0); - cacheServer.start(); - - cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName); - }); - } - - protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks) { + protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int maxTasks, + String sourceRegion, String sinkRegion, String sourceTopic, String sinkTopic, + String offsetPath, String locatorString) { WorkerAndHerderCluster workerAndHerderCluster = new WorkerAndHerderCluster(); try { - workerAndHerderCluster.start(String.valueOf(maxTasks)); + workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, sinkRegion, sourceTopic, + sinkTopic, offsetPath, locatorString); Thread.sleep(20000); } catch (Exception e) { throw new RuntimeException("Could not start the worker and herder cluster" + e); @@ -149,28 +143,16 @@ public class GeodeKafkaTestUtils { return workerAndHerderCluster; } - protected static void startGeodeClientAndRegion(VM client, String regionName) { - client.invoke(() -> { - ClientCache clientCache = new ClientCacheFactory() - .addPoolLocator("localhost", 10334) - .create(); - - clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName); - }); - } - - protected static void putDataIntoGeodeCluster(VM client, String regionName, int num) { - client.invoke(() -> { - ClientCache clientCache = new ClientCacheFactory() - .addPoolLocator("localhost", 10334) - .create(); - Region region = clientCache.getRegion(regionName); - for (int i = 0; i < num; i++) { - region.put("KEY" + i, "VALUE" + i); + protected static void verifyEventsAreConsumed(Consumer<String, String> consumer, int numEvents) { + AtomicInteger valueReceived = new AtomicInteger(0); + await().atMost(10, TimeUnit.SECONDS).until(() -> { + ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(2)); + for (ConsumerRecord<String, String> record : records) { + System.out.println("NABA :: " + record); + valueReceived.incrementAndGet(); } + return valueReceived.get() == numEvents; }); } - - } diff --git a/src/test/java/org/geode/kafka/JavaProcess.java b/src/test/java/org/geode/kafka/JavaProcess.java index 5355c75..a88638b 100644 --- a/src/test/java/org/geode/kafka/JavaProcess.java +++ b/src/test/java/org/geode/kafka/JavaProcess.java @@ -31,9 +31,17 @@ public class JavaProcess { System.getProperty("java.home") + File.separator + "bin" + File.separator + "java"; String classpath = System.getProperty("java.class.path"); String className = classWithMain.getName(); - + int commandLength = args.length + 4; + String[] processBuilderCommand = new String[commandLength]; + processBuilderCommand[0] = java; + processBuilderCommand[1] = "-cp"; + processBuilderCommand[2] = classpath; + processBuilderCommand[3] = className; + for (int i = 0; i < args.length; i++) { + processBuilderCommand[4 + i] = args[i]; + } ProcessBuilder builder = new ProcessBuilder( - java, "-cp", classpath, className, convertArgsToString(args)); + processBuilderCommand); process = builder.inheritIO().start(); } diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java index bda962e..f0a0a1d 100644 --- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java +++ b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java @@ -30,7 +30,7 @@ public class LocatorLauncherWrapper { properties.setProperty(ConfigurationProperties.NAME, "locator1"); Locator.startLocatorAndDS(10334, - new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), properties); + null/*new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log")*/, properties); while (true) { } diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java index 1bc63fe..026012c 100644 --- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java +++ b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java @@ -48,8 +48,8 @@ public class ServerLauncherWrapper { .set(ConfigurationProperties.LOCATORS, locatorString) .set(ConfigurationProperties.NAME, "server-1") - .set(ConfigurationProperties.LOG_FILE, - "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") +// .set(ConfigurationProperties.LOG_FILE, +// "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log") .set(ConfigurationProperties.LOG_LEVEL, "info") // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile) .create(); diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java b/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java index 824edab..022e381 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java +++ b/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java @@ -26,7 +26,14 @@ public class WorkerAndHerderCluster { public void start(String maxTasks) throws IOException, InterruptedException { workerAndHerder.exec(maxTasks); + } + public void start(String maxTasks, String sourceRegion, String sinkRegion, String sourceTopic, + String sinkTopic, String offsetPath, String locatorString) + throws IOException, InterruptedException { + String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, sourceTopic, sinkTopic, + offsetPath, locatorString}; + workerAndHerder.exec(args); } public void stop() { diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java index 3afcde7..0d8ad40 100644 --- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java +++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java @@ -18,6 +18,7 @@ import static org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BIND import static org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS; import java.io.IOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Map; @@ -39,10 +40,27 @@ public class WorkerAndHerderWrapper { public static void main(String[] args) throws IOException { String maxTasks = args[0]; + String offsetPath = "/tmp/connect.offsets"; + String regionToTopicBinding = GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS; + String topicToRegionBinding = GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS; + String testTopicForSink = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK; + String locatorString = null; + System.out.println("MaxTask " + maxTasks); + if (args.length == 7) { + String sourceRegion = args[1]; + String sinkRegion = args[2]; + String sourceTopic = args[3]; + String sinkTopic = args[4]; + offsetPath = args[5]; + regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]"; + topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]"; + locatorString = args[6]; + System.out.println("NABA args = " + Arrays.deepToString(args)); + } Map props = new HashMap(); props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); - props.put("offset.storage.file.filename", "/tmp/connect.offsets"); + props.put("offset.storage.file.filename", offsetPath); // fast flushing for testing. props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10"); @@ -54,7 +72,7 @@ public class WorkerAndHerderWrapper { "org.apache.kafka.connect.storage.StringConverter"); props.put("key.converter.schemas.enable", "false"); props.put("value.converter.schemas.enable", "false"); - + props.put(GeodeConnectorConfig.LOCATORS, locatorString); WorkerConfig workerCfg = new StandaloneConfig(props); MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore(); @@ -72,7 +90,8 @@ public class WorkerAndHerderWrapper { sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSource.class.getName()); sourceProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-source-connector"); sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks); - sourceProps.put(REGION_TO_TOPIC_BINDINGS, GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS); + sourceProps.put(GeodeConnectorConfig.LOCATORS, locatorString); + sourceProps.put(REGION_TO_TOPIC_BINDINGS, regionToTopicBinding); herder.putConnectorConfig( sourceProps.get(ConnectorConfig.NAME_CONFIG), @@ -83,8 +102,10 @@ public class WorkerAndHerderWrapper { sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, GeodeKafkaSink.class.getName()); sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector"); sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks); - sinkProps.put(TOPIC_TO_REGION_BINDINGS, GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS); - sinkProps.put("topics", GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK); + sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding); + sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString); + System.out.println("NABA : binding " + topicToRegionBinding); + sinkProps.put("topics", testTopicForSink); herder.putConnectorConfig( sinkProps.get(ConnectorConfig.NAME_CONFIG), diff --git a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java b/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java index 65ff819..d7cb99a 100644 --- a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java +++ b/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java @@ -52,5 +52,6 @@ public class ZooKeeperLocalCluster { }; zooKeeperThread.start(); System.out.println("ZooKeeper thread started"); + } }