This is an automated email from the ASF dual-hosted git repository.

nnag pushed a commit to branch feature/restructuring
in repository https://gitbox.apache.org/repos/asf/geode-kafka-connector.git

commit 3e12e9ed322c5d5dfb9baa55533eb738f671cdc3
Author: Naburun Nag <n...@cs.wisc.edu>
AuthorDate: Tue Feb 18 18:51:31 2020 -0800

    adding the sink tests
---
 ConfigDiskDir_locator1/BACKUPcluster_config.if     | Bin 0 -> 1048576 bytes
 ConfigDiskDir_locator1/BACKUPcluster_config_1.crf  | Bin 0 -> 200 bytes
 ConfigDiskDir_locator1/BACKUPcluster_config_1.drf  | Bin 0 -> 34 bytes
 ConfigDiskDir_locator1/BACKUPcluster_config_1.krf  | Bin 0 -> 68 bytes
 ConfigDiskDir_locator1/BACKUPcluster_config_3.crf  | Bin 0 -> 41 bytes
 ConfigDiskDir_locator1/BACKUPcluster_config_3.drf  | Bin 0 -> 36 bytes
 build.gradle                                       |   5 +-
 dunit/locator/locator63530view.dat                 | Bin 0 -> 198 bytes
 locator10334view.dat                               | Bin 0 -> 299 bytes
 .../java/org/geode/kafka/GeodeAsSinkDUnitTest.java | 165 ++++++++++++++++++++
 .../org/geode/kafka/GeodeAsSourceDUnitTest.java    | 171 +++++++++++++++++++++
 .../geode/kafka/GeodeKafkaConnectorTestBase.java   | 171 ---------------------
 .../org/geode/kafka/GeodeKafkaTestCluster.java     |   1 -
 .../java/org/geode/kafka/GeodeKafkaTestUtils.java  | 132 +++++++---------
 src/test/java/org/geode/kafka/JavaProcess.java     |  12 +-
 .../org/geode/kafka/LocatorLauncherWrapper.java    |   2 +-
 .../org/geode/kafka/ServerLauncherWrapper.java     |   4 +-
 .../org/geode/kafka/WorkerAndHerderCluster.java    |   7 +
 .../org/geode/kafka/WorkerAndHerderWrapper.java    |  31 +++-
 .../org/geode/kafka/ZooKeeperLocalCluster.java     |   1 +
 20 files changed, 443 insertions(+), 259 deletions(-)

diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config.if 
b/ConfigDiskDir_locator1/BACKUPcluster_config.if
new file mode 100644
index 0000000..8282543
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config.if 
differ
diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf 
b/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf
new file mode 100644
index 0000000..6dd9f70
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.crf 
differ
diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf 
b/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf
new file mode 100644
index 0000000..f2b08ee
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.drf 
differ
diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf 
b/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf
new file mode 100644
index 0000000..9151588
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_1.krf 
differ
diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf 
b/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf
new file mode 100644
index 0000000..319260f
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_3.crf 
differ
diff --git a/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf 
b/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf
new file mode 100644
index 0000000..0d6be92
Binary files /dev/null and b/ConfigDiskDir_locator1/BACKUPcluster_config_3.drf 
differ
diff --git a/build.gradle b/build.gradle
index 33f9b9c..add42c7 100644
--- a/build.gradle
+++ b/build.gradle
@@ -73,10 +73,11 @@ dependencies {
     testCompile(group: 'org.apache.kafka', name: 'connect-runtime', version: 
'2.3.1')
 
     testCompile(group: 'junit', name: 'junit', version: '4.12')
+//    testCompile("org.junit.jupiter:junit-jupiter-params:5.4.2")
     testCompile('org.mockito:mockito-core:3.2.4')
     testCompile('pl.pragmatists:JUnitParams:1.1.1')
-    testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: 
'1.11.0')
-    testImplementation 'org.awaitility:awaitility:4.0.2'
+    testCompile(group: 'org.apache.geode', name: 'geode-dunit', version: 
'1.9.0')
+    testImplementation 'org.awaitility:awaitility:3.1.6'
 }
 
 shadowJar {
diff --git a/dunit/locator/locator63530view.dat 
b/dunit/locator/locator63530view.dat
new file mode 100644
index 0000000..10a68c2
Binary files /dev/null and b/dunit/locator/locator63530view.dat differ
diff --git a/locator10334view.dat b/locator10334view.dat
new file mode 100644
index 0000000..93a5f6e
Binary files /dev/null and b/locator10334view.dat differ
diff --git a/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java 
b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
new file mode 100644
index 0000000..5c94575
--- /dev/null
+++ b/src/test/java/org/geode/kafka/GeodeAsSinkDUnitTest.java
@@ -0,0 +1,165 @@
+package org.geode.kafka;
+
+import static org.awaitility.Awaitility.await;
+import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
+import static org.geode.kafka.GeodeKafkaTestUtils.createProducer;
+import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
+import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
+import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed;
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.concurrent.TimeUnit;
+
+import com.fasterxml.jackson.module.scala.ser.SymbolSerializerModule;
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.utils.Time;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+//@RunWith(Parameterized.class)
+public class GeodeAsSinkDUnitTest {
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  private static MemberVM locator, server;
+  private static ClientVM client;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup()
+      throws Exception {
+    startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+        false,
+        200000,
+        15000,
+        10,
+        Time.SYSTEM,
+        "myGroup",
+        "myMetricType",
+        null);
+
+    zkClient.close();
+  }
+
+  @Parameterized.Parameters(name = "tasks_{0}_partitions_{1}")
+  public static Iterable<Object[]> getParams() {
+    return Arrays.asList(new Object[][] {{1, 1}, {5, 10}, {15, 10}});
+  }
+
+  private /*final*/ int numTask = 1;
+  private /*final*/ int numPartition = 1;
+
+//  public GeodeAsSinkDUnitTest(int numTask, int numPartition) {
+//    this.numTask = numTask;
+//    this.numPartition = numPartition;
+//  }
+
+  @Test
+  public void whenKafkaProducerProducesEventsThenGeodeMustReceiveTheseEvents() 
throws Exception {
+
+    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    int locatorPort = locator.getPort();
+    server = clusterStartupRule.startServerVM(1, locatorPort);
+    client =
+        clusterStartupRule
+            .startClientVM(2, client -> 
client.withLocatorConnection(locatorPort));
+    int NUM_EVENT = 10;
+
+    // Set unique names for all the different components
+    String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", "");
+    String sourceRegion = "SOURCE_REGION_" + testIdentifier;
+    String sinkRegion = "SINK_REGION_" + testIdentifier;
+    String sinkTopic = "SINK_TOPIC_" + testIdentifier;
+    String sourceTopic = "SOURCE_TOPIC_" + testIdentifier;
+
+    /**
+     * Start the Apache Geode cluster and create the source and sink regions.
+     * Create a Apache Geode client which inserts data into the source
+     */
+    server.invoke(() -> {
+      
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sourceRegion);
+      
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sinkRegion);
+    });
+
+
+    /**
+     * Start the Kafka Cluster, workers and the topic to which the Apache 
Geode will connect as
+     * a source
+     */
+    WorkerAndHerderCluster workerAndHerderCluster = null;
+    KafkaLocalCluster kafkaLocalCluster = null;
+    try {
+      kafkaLocalCluster = startKafka(
+          
getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+      createTopic(sinkTopic, numPartition, 1);
+      // Create workers and herder cluster
+      workerAndHerderCluster = startWorkerAndHerderCluster(numTask, 
sourceRegion, sinkRegion,
+          sourceTopic, sinkTopic, 
temporaryFolderForOffset.getRoot().getAbsolutePath(),
+          "localhost[" + locatorPort + "]");
+      client.invoke(() -> {
+        
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+            .create(sinkRegion);
+      });
+      // Create the producer
+      Producer<String, String> producer = createProducer();
+
+      for (int i = 0; i < NUM_EVENT; i++) {
+        producer.send(new ProducerRecord(sinkTopic, "KEY" + i, "VALUE" + i));
+      }
+
+      client.invoke(() -> {
+        Region region = 
ClusterStartupRule.getClientCache().getRegion(sinkRegion);
+        await().atMost(10, TimeUnit.SECONDS)
+            .untilAsserted(() -> assertEquals(10, region.sizeOnServer()));
+      });
+
+
+    } finally {
+      // Clean up by deleting the sink topic
+      deleteTopic(sinkTopic);
+      if (workerAndHerderCluster != null) {
+        workerAndHerderCluster.stop();
+      }
+      kafkaLocalCluster.stop();
+    }
+
+    }
+}
diff --git a/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java 
b/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java
new file mode 100644
index 0000000..d300b55
--- /dev/null
+++ b/src/test/java/org/geode/kafka/GeodeAsSourceDUnitTest.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
+ * agreements. See the NOTICE file distributed with this work for additional 
information regarding
+ * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the 
License. You may obtain a
+ * copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software 
distributed under the License
+ * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
+ * or implied. See the License for the specific language governing permissions 
and limitations under
+ * the License.
+ */
+package org.geode.kafka;
+
+import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
+import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
+import static org.geode.kafka.GeodeKafkaTestUtils.getKafkaConfig;
+import static org.geode.kafka.GeodeKafkaTestUtils.getZooKeeperProperties;
+import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
+import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
+import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
+import static org.geode.kafka.GeodeKafkaTestUtils.verifyEventsAreConsumed;
+
+import java.util.Arrays;
+
+import kafka.zk.KafkaZkClient;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.utils.Time;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+import org.apache.geode.cache.Region;
+import org.apache.geode.cache.RegionShortcut;
+import org.apache.geode.cache.client.ClientRegionShortcut;
+import org.apache.geode.test.dunit.rules.ClientVM;
+import org.apache.geode.test.dunit.rules.ClusterStartupRule;
+import org.apache.geode.test.dunit.rules.MemberVM;
+
+@RunWith(Parameterized.class)
+public class GeodeAsSourceDUnitTest {
+  @Rule
+  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule(3);
+
+  private static MemberVM locator, server;
+  private static ClientVM client;
+
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @ClassRule
+  public static TemporaryFolder temporaryFolderForZooKeeper = new 
TemporaryFolder();
+
+  @Rule
+  public TemporaryFolder temporaryFolderForOffset = new TemporaryFolder();
+
+  @BeforeClass
+  public static void setup()
+      throws Exception {
+    startZooKeeper(getZooKeeperProperties(temporaryFolderForZooKeeper));
+  }
+
+  @AfterClass
+  public static void cleanUp() {
+    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
+        false,
+        200000,
+        15000,
+        10,
+        Time.SYSTEM,
+        "myGroup",
+        "myMetricType",
+        null);
+
+    zkClient.close();
+  }
+
+  @Parameters(name = "tasks_{0}_partitions_{1}")
+  public static Iterable<Object[]> getParams() {
+    return Arrays.asList(new Object[][] {{1, 1}, {1, 2}, {5, 2}});
+  }
+
+  private final int numTask;
+  private final int numPartition;
+
+  public GeodeAsSourceDUnitTest(int numTask, int numPartition) {
+    this.numTask = numTask;
+    this.numPartition = numPartition;
+  }
+
+  @Test
+  public void 
whenDataIsInsertedInGeodeSourceThenKafkaConsumerMustReceiveEvents() throws 
Exception {
+    locator = clusterStartupRule.startLocatorVM(0, 10334);
+    int locatorPort = locator.getPort();
+    server = clusterStartupRule.startServerVM(1, locatorPort);
+    client =
+        clusterStartupRule
+            .startClientVM(2, client -> 
client.withLocatorConnection(locatorPort));
+    int NUM_EVENT = 10;
+
+    // Set unique names for all the different components
+    String testIdentifier = testName.getMethodName().replaceAll("\\[|\\]", "");
+    String sourceRegion = "SOURCE_REGION_" + testIdentifier;
+    String sinkRegion = "SINK_REGION_" + testIdentifier;
+    String sinkTopic = "SINK_TOPIC_" + testIdentifier;
+    String sourceTopic = "SOURCE_TOPIC_" + testIdentifier;
+
+    /**
+     * Start the Apache Geode cluster and create the source and sink regions.
+     * Create a Apache Geode client which inserts data into the source
+     */
+    server.invoke(() -> {
+      
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sourceRegion);
+      
ClusterStartupRule.getCache().createRegionFactory(RegionShortcut.PARTITION)
+          .create(sinkRegion);
+    });
+    client.invoke(() -> {
+      
ClusterStartupRule.getClientCache().createClientRegionFactory(ClientRegionShortcut.PROXY)
+          .create(sourceRegion);
+    });
+
+    /**
+     * Start the Kafka Cluster, workers and the topic to which the Apache 
Geode will connect as
+     * a source
+     */
+    WorkerAndHerderCluster workerAndHerderCluster = null;
+    KafkaLocalCluster kafkaLocalCluster = null;
+    try {
+      kafkaLocalCluster = startKafka(
+          
getKafkaConfig(temporaryFolderForOffset.newFolder("kafkaLogs").getAbsolutePath()));
+      createTopic(sourceTopic, numPartition, 1);
+      // Create workers and herder cluster
+      workerAndHerderCluster = startWorkerAndHerderCluster(numTask, 
sourceRegion, sinkRegion,
+          sourceTopic, sinkTopic, 
temporaryFolderForOffset.getRoot().getAbsolutePath(),
+          "localhost[" + locatorPort + "]");
+
+      // Create the consumer to consume from the source topic
+      Consumer<String, String> consumer = createConsumer(sourceTopic);
+
+      // Insert data into the Apache Geode source from the client
+      client.invoke(() -> {
+        Region region = 
ClusterStartupRule.getClientCache().getRegion(sourceRegion);
+        for (int i = 0; i < NUM_EVENT; i++) {
+          region.put("KEY" + i, "VALUE" + i);
+        }
+      });
+
+      // Assert that all the data inserted in Apache Geode source is received 
by the consumer
+      verifyEventsAreConsumed(consumer, NUM_EVENT);
+    } finally {
+      // Clean up by deleting the source topic
+      deleteTopic(sourceTopic);
+      if (workerAndHerderCluster != null) {
+        workerAndHerderCluster.stop();
+      }
+      kafkaLocalCluster.stop();
+    }
+  }
+}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java 
b/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java
deleted file mode 100644
index 43723a9..0000000
--- a/src/test/java/org/geode/kafka/GeodeKafkaConnectorTestBase.java
+++ /dev/null
@@ -1,171 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more 
contributor license
- * agreements. See the NOTICE file distributed with this work for additional 
information regarding
- * copyright ownership. The ASF licenses this file to You under the Apache 
License, Version 2.0 (the
- * "License"); you may not use this file except in compliance with the 
License. You may obtain a
- * copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software 
distributed under the License
- * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
KIND, either express
- * or implied. See the License for the specific language governing permissions 
and limitations under
- * the License.
- */
-package org.geode.kafka;
-
-import static org.awaitility.Awaitility.await;
-import static org.geode.kafka.GeodeKafkaTestUtils.createConsumer;
-import static org.geode.kafka.GeodeKafkaTestUtils.createTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.deleteTopic;
-import static org.geode.kafka.GeodeKafkaTestUtils.putDataIntoGeodeCluster;
-import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeClientAndRegion;
-import static org.geode.kafka.GeodeKafkaTestUtils.startGeodeLocator;
-import static 
org.geode.kafka.GeodeKafkaTestUtils.startGeodeServerAndCreateSourceRegion;
-import static org.geode.kafka.GeodeKafkaTestUtils.startKafka;
-import static org.geode.kafka.GeodeKafkaTestUtils.startWorkerAndHerderCluster;
-import static org.geode.kafka.GeodeKafkaTestUtils.startZooKeeper;
-
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Properties;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import kafka.zk.KafkaZkClient;
-import org.apache.kafka.clients.consumer.Consumer;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.utils.Time;
-import org.junit.After;
-import org.junit.ClassRule;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-import org.junit.rules.TestName;
-
-import org.apache.geode.test.dunit.Host;
-import org.apache.geode.test.dunit.VM;
-import org.apache.geode.test.dunit.rules.ClusterStartupRule;
-import org.apache.geode.test.version.VersionManager;
-
-public class GeodeKafkaConnectorTestBase {
-  @Rule
-  public ClusterStartupRule clusterStartupRule = new ClusterStartupRule();
-
-  @Rule
-  public TestName testName = new TestName();
-
-  @ClassRule
-  public static TemporaryFolder temporaryFolder = new TemporaryFolder();
-  private static boolean debug = true;
-
-  public static String TEST_REGION_TO_TOPIC_BINDINGS = 
"[someRegionForSource:someTopicForSource]";
-  public static String TEST_TOPIC_TO_REGION_BINDINGS = 
"[someTopicForSink:someRegionForSink]";
-
-  public static String TEST_TOPIC_FOR_SOURCE = "someTopicForSource";
-  public static String TEST_REGION_FOR_SOURCE = "someRegionForSource";
-  public static String TEST_TOPIC_FOR_SINK = "someTopicForSink";
-  public static String TEST_REGION_FOR_SINK = "someRegionForSink";
-
-  private static ZooKeeperLocalCluster zooKeeperLocalCluster;
-  private static KafkaLocalCluster kafkaLocalCluster;
-  private static GeodeLocalCluster geodeLocalCluster;
-  private static WorkerAndHerderCluster workerAndHerderCluster;
-  private static Consumer<String, String> consumer;
-
-  private static Properties getZooKeeperProperties() throws IOException {
-    Properties properties = new Properties();
-    properties.setProperty("dataDir",
-        (debug) ? "/tmp/zookeeper" : 
temporaryFolder.newFolder("zookeeper").getAbsolutePath());
-    properties.setProperty("clientPort", "2181");
-    properties.setProperty("tickTime", "2000");
-    return properties;
-  }
-
-  private static Properties getKafkaConfig() throws IOException {
-    int BROKER_PORT = 9092;
-    Properties props = new Properties();
-
-    props.put("broker.id", "0");
-    props.put("zookeeper.connect", "localhost:2181");
-    props.put("host.name", "localHost");
-    props.put("port", BROKER_PORT);
-    props.put("offsets.topic.replication.factor", "1");
-
-    // Specifically GeodeKafka connector configs
-    return props;
-  }
-
-
-  // @Before
-  // public void setup()
-  // throws IOException, QuorumPeerConfig.ConfigException, 
InterruptedException {
-  //
-  // System.out.println("NABA : kafka started");
-  // }
-
-  @After
-  public void cleanUp() {
-    KafkaZkClient zkClient = KafkaZkClient.apply("localhost:2181",
-        false,
-        200000,
-        15000,
-        10,
-        Time.SYSTEM,
-        "myGroup",
-        "myMetricType",
-        null);
-
-    zkClient.close();
-    kafkaLocalCluster.stop();
-  }
-
-  @Test
-  public void doNothing() {
-
-  }
-
-  @Test
-  public void endToEndSourceTest2() throws Exception {
-    startZooKeeper(getZooKeeperProperties());
-    startKafka(getKafkaConfig());
-    Host host = Host.getHost(0);
-    VM server = host.getVM(VersionManager.CURRENT_VERSION, 0);
-    VM locator = host.getVM(VersionManager.CURRENT_VERSION, 1);
-    VM client = host.getVM(VersionManager.CURRENT_VERSION, 2);
-    WorkerAndHerderCluster workerAndHerderCluster = null;
-    // Start the Apache Geode locator and server and create the source region
-    startGeodeLocator(locator);
-    // Topic Name and all region names must be the same
-    String topicName = testName.getMethodName();
-    startGeodeServerAndCreateSourceRegion(server, topicName);
-    startGeodeClientAndRegion(client, topicName);
-
-    try {
-      createTopic(topicName, 1, 1);
-      // Create workers and herder cluster
-      workerAndHerderCluster = startWorkerAndHerderCluster(1);
-
-      consumer = createConsumer(topicName);
-
-      // Insert data into the Apache Geode source
-      putDataIntoGeodeCluster(client, topicName, 10);
-      // Assert that all the data inserted in Apache Geode source is received 
by the consumer
-      AtomicInteger valueReceived = new AtomicInteger(0);
-      await().atMost(10, TimeUnit.SECONDS).until(() -> {
-        ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(2));
-        for (ConsumerRecord<String, String> record : records) {
-          valueReceived.incrementAndGet();
-        }
-        return valueReceived.get() == 10;
-      });
-    } finally {
-      deleteTopic(topicName);
-      if (workerAndHerderCluster != null) {
-        workerAndHerderCluster.stop();
-      }
-    }
-  }
-
-}
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java 
b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
index f620caa..57d576d 100644
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
+++ b/src/test/java/org/geode/kafka/GeodeKafkaTestCluster.java
@@ -176,7 +176,6 @@ public class GeodeKafkaTestCluster {
     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
         StringDeserializer.class.getName());
     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
-
     // Create the consumer using props.
     final Consumer<String, String> consumer =
         new KafkaConsumer<>(props);
diff --git a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java 
b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
index a979e3b..27f9391 100644
--- a/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
+++ b/src/test/java/org/geode/kafka/GeodeKafkaTestUtils.java
@@ -14,15 +14,22 @@
  */
 package org.geode.kafka;
 
+import static org.awaitility.Awaitility.await;
+
 import java.io.IOException;
+import java.time.Duration;
 import java.util.Collections;
 import java.util.Properties;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import kafka.admin.RackAwareMode;
 import kafka.zk.AdminZkClient;
 import kafka.zk.KafkaZkClient;
 import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.Producer;
@@ -32,18 +39,7 @@ import 
org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
-
-import org.apache.geode.cache.Cache;
-import org.apache.geode.cache.CacheFactory;
-import org.apache.geode.cache.Region;
-import org.apache.geode.cache.RegionShortcut;
-import org.apache.geode.cache.client.ClientCache;
-import org.apache.geode.cache.client.ClientCacheFactory;
-import org.apache.geode.cache.client.ClientRegionShortcut;
-import org.apache.geode.cache.server.CacheServer;
-import org.apache.geode.distributed.ConfigurationProperties;
-import org.apache.geode.distributed.Locator;
-import org.apache.geode.test.dunit.VM;
+import org.junit.rules.TemporaryFolder;
 
 public class GeodeKafkaTestUtils {
   protected static ZooKeeperLocalCluster startZooKeeper(Properties 
zookeeperProperties)
@@ -54,7 +50,7 @@ public class GeodeKafkaTestUtils {
   }
 
   protected static KafkaLocalCluster startKafka(Properties kafkaProperties)
-      throws IOException, InterruptedException, 
QuorumPeerConfig.ConfigException {
+      throws IOException, InterruptedException {
     KafkaLocalCluster kafkaLocalCluster = new 
KafkaLocalCluster(kafkaProperties);
     kafkaLocalCluster.start();
     return kafkaLocalCluster;
@@ -78,6 +74,41 @@ public class GeodeKafkaTestUtils {
     adminZkClient.deleteTopic(topicName);
   }
 
+  protected static Producer<String, String> createProducer() {
+    final Properties props = new Properties();
+    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+        StringSerializer.class.getName());
+    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+        StringSerializer.class.getName());
+
+    // Create the producer using props.
+    final Producer<String, String> producer =
+        new KafkaProducer<>(props);
+    return producer;
+  }
+
+  protected static Properties getZooKeeperProperties(TemporaryFolder 
temporaryFolder)
+      throws IOException {
+    Properties properties = new Properties();
+    properties.setProperty("dataDir", 
temporaryFolder.newFolder("zookeeper").getAbsolutePath());
+    properties.setProperty("clientPort", "2181");
+    properties.setProperty("tickTime", "2000");
+    return properties;
+  }
+
+  protected static Properties getKafkaConfig(String logPath) {
+    int BROKER_PORT = 9092;
+    Properties props = new Properties();
+    props.put("broker.id", "0");
+    props.put("zookeeper.connect", "localhost:2181");
+    props.put("host.name", "localHost");
+    props.put("port", BROKER_PORT);
+    props.put("offsets.topic.replication.factor", "1");
+    props.put("log.dirs", logPath);
+    return props;
+  }
+
   // consumer props, less important, just for testing?
   public static Consumer<String, String> createConsumer(String 
testTopicForSource) {
     final Properties props = new Properties();
@@ -98,50 +129,13 @@ public class GeodeKafkaTestUtils {
     return consumer;
   }
 
-  // consumer props, less important, just for testing?
-  public static Producer<String, String> createProducer() {
-    final Properties props = new Properties();
-    props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
-        StringSerializer.class.getName());
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
-        StringSerializer.class.getName());
-
-    // Create the producer using props.
-    final Producer<String, String> producer =
-        new KafkaProducer<>(props);
-    return producer;
-
-  }
-
-  public static void startGeodeLocator(VM locatorVM) {
-    locatorVM.invoke(() -> {
-      Properties properties = new Properties();
-      properties.setProperty(ConfigurationProperties.NAME, "locator1");
-      Locator.startLocatorAndDS(10334,
-          null, properties);
-    });
-  }
-
-  public static void startGeodeServerAndCreateSourceRegion(VM serverVM, String 
regionName) {
-    serverVM.invoke(() -> {
-      Properties properties = new Properties();
-      Cache cache = new CacheFactory(properties)
-          .set(ConfigurationProperties.LOCATORS, "localhost[10334]")
-          .set(ConfigurationProperties.NAME, "server-1")
-          .create();
-      CacheServer cacheServer = cache.addCacheServer();
-      cacheServer.setPort(0);
-      cacheServer.start();
-
-      cache.createRegionFactory(RegionShortcut.PARTITION).create(regionName);
-    });
-  }
-
-  protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int 
maxTasks) {
+  protected static WorkerAndHerderCluster startWorkerAndHerderCluster(int 
maxTasks,
+      String sourceRegion, String sinkRegion, String sourceTopic, String 
sinkTopic,
+      String offsetPath, String locatorString) {
     WorkerAndHerderCluster workerAndHerderCluster = new 
WorkerAndHerderCluster();
     try {
-      workerAndHerderCluster.start(String.valueOf(maxTasks));
+      workerAndHerderCluster.start(String.valueOf(maxTasks), sourceRegion, 
sinkRegion, sourceTopic,
+          sinkTopic, offsetPath, locatorString);
       Thread.sleep(20000);
     } catch (Exception e) {
       throw new RuntimeException("Could not start the worker and herder 
cluster" + e);
@@ -149,28 +143,16 @@ public class GeodeKafkaTestUtils {
     return workerAndHerderCluster;
   }
 
-  protected static void startGeodeClientAndRegion(VM client, String 
regionName) {
-    client.invoke(() -> {
-      ClientCache clientCache = new ClientCacheFactory()
-          .addPoolLocator("localhost", 10334)
-          .create();
-
-      
clientCache.createClientRegionFactory(ClientRegionShortcut.PROXY).create(regionName);
-    });
-  }
-
-  protected static void putDataIntoGeodeCluster(VM client, String regionName, 
int num) {
-    client.invoke(() -> {
-      ClientCache clientCache = new ClientCacheFactory()
-          .addPoolLocator("localhost", 10334)
-          .create();
-      Region region = clientCache.getRegion(regionName);
-      for (int i = 0; i < num; i++) {
-        region.put("KEY" + i, "VALUE" + i);
+  protected static void verifyEventsAreConsumed(Consumer<String, String> 
consumer, int numEvents) {
+    AtomicInteger valueReceived = new AtomicInteger(0);
+    await().atMost(10, TimeUnit.SECONDS).until(() -> {
+      ConsumerRecords<String, String> records = 
consumer.poll(Duration.ofSeconds(2));
+      for (ConsumerRecord<String, String> record : records) {
+        System.out.println("NABA :: " + record);
+        valueReceived.incrementAndGet();
       }
+      return valueReceived.get() == numEvents;
     });
 
   }
-
-
 }
diff --git a/src/test/java/org/geode/kafka/JavaProcess.java 
b/src/test/java/org/geode/kafka/JavaProcess.java
index 5355c75..a88638b 100644
--- a/src/test/java/org/geode/kafka/JavaProcess.java
+++ b/src/test/java/org/geode/kafka/JavaProcess.java
@@ -31,9 +31,17 @@ public class JavaProcess {
         System.getProperty("java.home") + File.separator + "bin" + 
File.separator + "java";
     String classpath = System.getProperty("java.class.path");
     String className = classWithMain.getName();
-
+    int commandLength = args.length + 4;
+    String[] processBuilderCommand = new String[commandLength];
+    processBuilderCommand[0] = java;
+    processBuilderCommand[1] = "-cp";
+    processBuilderCommand[2] = classpath;
+    processBuilderCommand[3] = className;
+    for (int i = 0; i < args.length; i++) {
+      processBuilderCommand[4 + i] = args[i];
+    }
     ProcessBuilder builder = new ProcessBuilder(
-        java, "-cp", classpath, className, convertArgsToString(args));
+        processBuilderCommand);
 
     process = builder.inheritIO().start();
   }
diff --git a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java 
b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
index bda962e..f0a0a1d 100644
--- a/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/LocatorLauncherWrapper.java
@@ -30,7 +30,7 @@ public class LocatorLauncherWrapper {
     properties.setProperty(ConfigurationProperties.NAME, "locator1");
 
     Locator.startLocatorAndDS(10334,
-        new File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log"), 
properties);
+        null/*new 
File("/Users/jhuynh/Pivotal/geode-kafka-connector/locator.log")*/, properties);
     while (true) {
 
     }
diff --git a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java 
b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
index 1bc63fe..026012c 100644
--- a/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
+++ b/src/test/java/org/geode/kafka/ServerLauncherWrapper.java
@@ -48,8 +48,8 @@ public class ServerLauncherWrapper {
         .set(ConfigurationProperties.LOCATORS, locatorString)
         .set(ConfigurationProperties.NAME,
             "server-1")
-        .set(ConfigurationProperties.LOG_FILE,
-            "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
+//        .set(ConfigurationProperties.LOG_FILE,
+//            "/Users/jhuynh/Pivotal/geode-kafka-connector/server.log")
         .set(ConfigurationProperties.LOG_LEVEL, "info")
         // .set(ConfigurationProperties.STATISTIC_ARCHIVE_FILE, statsFile)
         .create();
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java 
b/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
index 824edab..022e381 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderCluster.java
@@ -26,7 +26,14 @@ public class WorkerAndHerderCluster {
 
   public void start(String maxTasks) throws IOException, InterruptedException {
     workerAndHerder.exec(maxTasks);
+  }
 
+  public void start(String maxTasks, String sourceRegion, String sinkRegion, 
String sourceTopic,
+      String sinkTopic, String offsetPath, String locatorString)
+      throws IOException, InterruptedException {
+    String[] args = new String[] {maxTasks, sourceRegion, sinkRegion, 
sourceTopic, sinkTopic,
+        offsetPath, locatorString};
+    workerAndHerder.exec(args);
   }
 
   public void stop() {
diff --git a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java 
b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
index 3afcde7..0d8ad40 100644
--- a/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
+++ b/src/test/java/org/geode/kafka/WorkerAndHerderWrapper.java
@@ -18,6 +18,7 @@ import static 
org.geode.kafka.sink.GeodeSinkConnectorConfig.TOPIC_TO_REGION_BIND
 import static 
org.geode.kafka.source.GeodeSourceConnectorConfig.REGION_TO_TOPIC_BINDINGS;
 
 import java.io.IOException;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,10 +40,27 @@ public class WorkerAndHerderWrapper {
 
   public static void main(String[] args) throws IOException {
     String maxTasks = args[0];
+    String offsetPath = "/tmp/connect.offsets";
+    String regionToTopicBinding = 
GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS;
+    String topicToRegionBinding = 
GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS;
+    String testTopicForSink = GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK;
+    String locatorString = null;
+    System.out.println("MaxTask " + maxTasks);
+    if (args.length == 7) {
+      String sourceRegion = args[1];
+      String sinkRegion = args[2];
+      String sourceTopic = args[3];
+      String sinkTopic = args[4];
+      offsetPath = args[5];
+      regionToTopicBinding = "[" + sourceRegion + ":" + sourceTopic + "]";
+      topicToRegionBinding = "[" + sinkTopic + ":" + sinkRegion + "]";
+      locatorString = args[6];
+      System.out.println("NABA args = " + Arrays.deepToString(args));
+    }
 
     Map props = new HashMap();
     props.put(WorkerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
-    props.put("offset.storage.file.filename", "/tmp/connect.offsets");
+    props.put("offset.storage.file.filename", offsetPath);
     // fast flushing for testing.
     props.put(WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG, "10");
 
@@ -54,7 +72,7 @@ public class WorkerAndHerderWrapper {
         "org.apache.kafka.connect.storage.StringConverter");
     props.put("key.converter.schemas.enable", "false");
     props.put("value.converter.schemas.enable", "false");
-
+    props.put(GeodeConnectorConfig.LOCATORS, locatorString);
     WorkerConfig workerCfg = new StandaloneConfig(props);
 
     MemoryOffsetBackingStore offBackingStore = new MemoryOffsetBackingStore();
@@ -72,7 +90,8 @@ public class WorkerAndHerderWrapper {
     sourceProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
GeodeKafkaSource.class.getName());
     sourceProps.put(ConnectorConfig.NAME_CONFIG, 
"geode-kafka-source-connector");
     sourceProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-    sourceProps.put(REGION_TO_TOPIC_BINDINGS, 
GeodeKafkaTestCluster.TEST_REGION_TO_TOPIC_BINDINGS);
+    sourceProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+    sourceProps.put(REGION_TO_TOPIC_BINDINGS, regionToTopicBinding);
 
     herder.putConnectorConfig(
         sourceProps.get(ConnectorConfig.NAME_CONFIG),
@@ -83,8 +102,10 @@ public class WorkerAndHerderWrapper {
     sinkProps.put(ConnectorConfig.CONNECTOR_CLASS_CONFIG, 
GeodeKafkaSink.class.getName());
     sinkProps.put(ConnectorConfig.NAME_CONFIG, "geode-kafka-sink-connector");
     sinkProps.put(ConnectorConfig.TASKS_MAX_CONFIG, maxTasks);
-    sinkProps.put(TOPIC_TO_REGION_BINDINGS, 
GeodeKafkaTestCluster.TEST_TOPIC_TO_REGION_BINDINGS);
-    sinkProps.put("topics", GeodeKafkaTestCluster.TEST_TOPIC_FOR_SINK);
+    sinkProps.put(TOPIC_TO_REGION_BINDINGS, topicToRegionBinding);
+    sinkProps.put(GeodeConnectorConfig.LOCATORS, locatorString);
+    System.out.println("NABA : binding " + topicToRegionBinding);
+    sinkProps.put("topics", testTopicForSink);
 
     herder.putConnectorConfig(
         sinkProps.get(ConnectorConfig.NAME_CONFIG),
diff --git a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java 
b/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
index 65ff819..d7cb99a 100644
--- a/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
+++ b/src/test/java/org/geode/kafka/ZooKeeperLocalCluster.java
@@ -52,5 +52,6 @@ public class ZooKeeperLocalCluster {
     };
     zooKeeperThread.start();
     System.out.println("ZooKeeper thread started");
+
   }
 }

Reply via email to