[hotfix][Kafka] Refactor properties for KafkaTestEnvironment setup

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e11a5919
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e11a5919
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e11a5919

Branch: refs/heads/master
Commit: e11a591956ba608308ccf81e13030291f150739b
Parents: b7f96f7
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
Authored: Mon Aug 7 15:53:35 2017 +0200
Committer: Stefan Richter <s.rich...@data-artisans.com>
Committed: Tue Aug 8 10:13:02 2017 +0200

----------------------------------------------------------------------
 .../kafka/KafkaTestEnvironmentImpl.java         | 33 ++++++-------
 .../kafka/KafkaTestEnvironmentImpl.java         | 19 ++++----
 .../kafka/KafkaTestEnvironmentImpl.java         | 35 +++++++-------
 .../kafka/KafkaShortRetentionTestBase.java      |  2 +-
 .../connectors/kafka/KafkaTestBase.java         |  2 +-
 .../connectors/kafka/KafkaTestEnvironment.java  | 49 ++++++++++++++++++--
 6 files changed, 88 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index d3b45a9..9f1d379 100644
--- 
a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -79,8 +79,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-       private Properties additionalServerProperties;
-       private boolean secureMode = false;
+       private Config config;
        // 6 seconds is default. Seems to be too small for travis. 30 seconds
        private int zkTimeout = 30000;
 
@@ -96,7 +95,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        @Override
        public Properties getSecureProperties() {
                Properties prop = new Properties();
-               if (secureMode) {
+               if (config.isSecureMode()) {
                        prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
                        prop.put("security.protocol", "SASL_PLAINTEXT");
                        prop.put("sasl.kerberos.service.name", "kafka");
@@ -215,26 +214,24 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
+       public void prepare(Config config) {
                //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
-               if (secureMode) {
+               if (config.isSecureMode()) {
                        //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
-                       numKafkaServers = 1;
+                       config.setKafkaServersNumber(1);
                        zkTimeout = zkTimeout * 15;
                }
+               this.config = config;
 
-               this.additionalServerProperties = additionalServerProperties;
-               this.secureMode = secureMode;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
                tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
                assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
 
                tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID().toString()));
                assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
 
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
+               tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+               for (int i = 0; i < config.getKafkaServersNumber(); i++) {
                        File tmpDir = new File(tmpKafkaParent, "server-" + i);
                        assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
                        tmpKafkaDirs.add(tmpDir);
@@ -249,12 +246,12 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("Starting Zookeeper with 
zookeeperConnectionString: {}", zookeeperConnectionString);
 
                        LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
+                       brokers = new 
ArrayList<>(config.getKafkaServersNumber());
 
-                       for (int i = 0; i < numKafkaServers; i++) {
+                       for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
 
-                               if (secureMode) {
+                               if (config.isSecureMode()) {
                                        brokerConnectionString += 
hostAndPortToUrlString(
                                                        
KafkaTestEnvironment.KAFKA_HOST,
                                                        
brokers.get(i).socketServer().boundPort(
@@ -347,7 +344,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                final long deadline = System.nanoTime() + 30_000_000_000L;
                do {
                        try {
-                               if (secureMode) {
+                               if (config.isSecureMode()) {
                                        //increase wait time since in Travis ZK 
timeout occurs frequently
                                        int wait = zkTimeout / 100;
                                        LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
@@ -407,8 +404,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                // for CI stability, increase zookeeper session timeout
                kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
                kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
-               if (additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
+               if (config.getKafkaServerProperties() != null) {
+                       
kafkaProperties.putAll(config.getKafkaServerProperties());
                }
 
                final int numTries = 5;
@@ -418,7 +415,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
 
                        //to support secure kafka cluster
-                       if (secureMode) {
+                       if (config.isSecureMode()) {
                                LOG.info("Adding Kafka secure configurations");
                                kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
                                kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index ab976e1..af5ad67 100644
--- 
a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -84,7 +84,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-       private Properties additionalServerProperties;
+
+       private Config config;
 
        public String getBrokerConnectionString() {
                return brokerConnectionString;
@@ -206,8 +207,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
-               this.additionalServerProperties = additionalServerProperties;
+       public void prepare(Config config) {
+               this.config = config;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
 
                tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
@@ -224,8 +225,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        fail("cannot create kafka temp dir: " + e.getMessage());
                }
 
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
+               tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+               for (int i = 0; i < config.getKafkaServersNumber(); i++) {
                        File tmpDir = new File(tmpKafkaParent, "server-" + i);
                        assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
                        tmpKafkaDirs.add(tmpDir);
@@ -240,9 +241,9 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        zookeeperConnectionString = 
zookeeper.getConnectString();
 
                        LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
+                       brokers = new 
ArrayList<>(config.getKafkaServersNumber());
 
-                       for (int i = 0; i < numKafkaServers; i++) {
+                       for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
                                SocketServer socketServer = 
brokers.get(i).socketServer();
 
@@ -391,8 +392,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                // for CI stability, increase zookeeper session timeout
                kafkaProperties.put("zookeeper.session.timeout.ms", "30000");
                kafkaProperties.put("zookeeper.connection.timeout.ms", "30000");
-               if (additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
+               if (config.getKafkaServerProperties() != null) {
+                       
kafkaProperties.putAll(config.getKafkaServerProperties());
                }
 
                final int numTries = 5;

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
index df95420..517f096 100644
--- 
a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
+++ 
b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironmentImpl.java
@@ -78,11 +78,11 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        private String zookeeperConnectionString;
        private String brokerConnectionString = "";
        private Properties standardProps;
-       private Properties additionalServerProperties;
-       private boolean secureMode = false;
        // 6 seconds is default. Seems to be too small for travis. 30 seconds
        private String zkTimeout = "30000";
 
+       private Config config;
+
        public String getBrokerConnectionString() {
                return brokerConnectionString;
        }
@@ -200,27 +200,24 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
        }
 
        @Override
-       public void prepare(int numKafkaServers, Properties 
additionalServerProperties, boolean secureMode) {
-
+       public void prepare(Config config) {
                //increase the timeout since in Travis ZK connection takes long 
time for secure connection.
-               if (secureMode) {
+               if (config.isSecureMode()) {
                        //run only one kafka server to avoid multiple ZK 
connections from many instances - Travis timeout
-                       numKafkaServers = 1;
+                       config.setKafkaServersNumber(1);
                        zkTimeout = String.valueOf(Integer.parseInt(zkTimeout) 
* 15);
                }
+               this.config = config;
 
-               this.additionalServerProperties = additionalServerProperties;
-               this.secureMode = secureMode;
                File tempDir = new File(System.getProperty("java.io.tmpdir"));
-
                tmpZkDir = new File(tempDir, "kafkaITcase-zk-dir-" + 
(UUID.randomUUID().toString()));
                assertTrue("cannot create zookeeper temp dir", 
tmpZkDir.mkdirs());
 
                tmpKafkaParent = new File(tempDir, "kafkaITcase-kafka-dir-" + 
(UUID.randomUUID().toString()));
                assertTrue("cannot create kafka temp dir", 
tmpKafkaParent.mkdirs());
 
-               tmpKafkaDirs = new ArrayList<>(numKafkaServers);
-               for (int i = 0; i < numKafkaServers; i++) {
+               tmpKafkaDirs = new ArrayList<>(config.getKafkaServersNumber());
+               for (int i = 0; i < config.getKafkaServersNumber(); i++) {
                        File tmpDir = new File(tmpKafkaParent, "server-" + i);
                        assertTrue("cannot create kafka temp dir", 
tmpDir.mkdir());
                        tmpKafkaDirs.add(tmpDir);
@@ -236,13 +233,13 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        LOG.info("zookeeperConnectionString: {}", 
zookeeperConnectionString);
 
                        LOG.info("Starting KafkaServer");
-                       brokers = new ArrayList<>(numKafkaServers);
+                       brokers = new 
ArrayList<>(config.getKafkaServersNumber());
 
-                       for (int i = 0; i < numKafkaServers; i++) {
+                       for (int i = 0; i < config.getKafkaServersNumber(); 
i++) {
                                brokers.add(getKafkaServer(i, 
tmpKafkaDirs.get(i)));
 
                                SocketServer socketServer = 
brokers.get(i).socketServer();
-                               if (secureMode) {
+                               if (this.config.isSecureMode()) {
                                        brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.SASL_PLAINTEXT)) + ",";
                                } else {
                                        brokerConnectionString += 
hostAndPortToUrlString(KafkaTestEnvironment.KAFKA_HOST, 
brokers.get(i).socketServer().boundPort(SecurityProtocol.PLAINTEXT)) + ",";
@@ -335,7 +332,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                final long deadline = System.nanoTime() + 
Integer.parseInt(zkTimeout) * 1_000_000L;
                do {
                        try {
-                               if (secureMode) {
+                               if (config.isSecureMode()) {
                                        //increase wait time since in Travis ZK 
timeout occurs frequently
                                        int wait = Integer.parseInt(zkTimeout) 
/ 100;
                                        LOG.info("waiting for {} msecs before 
the topic {} can be checked", wait, topic);
@@ -400,8 +397,8 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                // for CI stability, increase zookeeper session timeout
                kafkaProperties.put("zookeeper.session.timeout.ms", zkTimeout);
                kafkaProperties.put("zookeeper.connection.timeout.ms", 
zkTimeout);
-               if (additionalServerProperties != null) {
-                       kafkaProperties.putAll(additionalServerProperties);
+               if (config.getKafkaServerProperties() != null) {
+                       
kafkaProperties.putAll(config.getKafkaServerProperties());
                }
 
                final int numTries = 5;
@@ -411,7 +408,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
                        kafkaProperties.put("port", 
Integer.toString(kafkaPort));
 
                        //to support secure kafka cluster
-                       if (secureMode) {
+                       if (config.isSecureMode()) {
                                LOG.info("Adding Kafka secure configurations");
                                kafkaProperties.put("listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
                                kafkaProperties.put("advertised.listeners", 
"SASL_PLAINTEXT://" + KAFKA_HOST + ":" + kafkaPort);
@@ -442,7 +439,7 @@ public class KafkaTestEnvironmentImpl extends 
KafkaTestEnvironment {
 
        public Properties getSecureProperties() {
                Properties prop = new Properties();
-               if (secureMode) {
+               if (config.isSecureMode()) {
                        prop.put("security.inter.broker.protocol", 
"SASL_PLAINTEXT");
                        prop.put("security.protocol", "SASL_PLAINTEXT");
                        prop.put("sasl.kerberos.service.name", "kafka");

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
index d5c9276..3163f52 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java
@@ -98,7 +98,7 @@ public class KafkaShortRetentionTestBase implements 
Serializable {
                specificProperties.setProperty("log.retention.minutes", "0");
                specificProperties.setProperty("log.retention.ms", "250");
                
specificProperties.setProperty("log.retention.check.interval.ms", "100");
-               kafkaServer.prepare(1, specificProperties, false);
+               
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServerProperties(specificProperties));
 
                standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
index c484a4b..8eb0351 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestBase.java
@@ -135,7 +135,7 @@ public abstract class KafkaTestBase extends TestLogger {
 
                LOG.info("Starting KafkaTestBase.prepare() for Kafka " + 
kafkaServer.getVersion());
 
-               kafkaServer.prepare(NUMBER_OF_KAFKA_SERVERS, secureMode);
+               
kafkaServer.prepare(kafkaServer.createConfig().setKafkaServersNumber(NUMBER_OF_KAFKA_SERVERS).setSecureMode(secureMode));
 
                standardProps = kafkaServer.getStandardProperties();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e11a5919/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
index 50eff23..ea292a9 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java
@@ -38,15 +38,56 @@ import java.util.Properties;
  * Abstract class providing a Kafka test environment.
  */
 public abstract class KafkaTestEnvironment {
+       /**
+        * Configuration class for {@link KafkaTestEnvironment}.
+        */
+       public static class Config {
+               private int kafkaServersNumber = 1;
+               private Properties kafkaServerProperties = null;
+               private boolean secureMode = false;
+
+               /**
+                * Please use {@link KafkaTestEnvironment#createConfig()} 
method.
+                */
+               private Config() {
+               }
+
+               public int getKafkaServersNumber() {
+                       return kafkaServersNumber;
+               }
+
+               public Config setKafkaServersNumber(int kafkaServersNumber) {
+                       this.kafkaServersNumber = kafkaServersNumber;
+                       return this;
+               }
+
+               public Properties getKafkaServerProperties() {
+                       return kafkaServerProperties;
+               }
+
+               public Config setKafkaServerProperties(Properties 
kafkaServerProperties) {
+                       this.kafkaServerProperties = kafkaServerProperties;
+                       return this;
+               }
+
+               public boolean isSecureMode() {
+                       return secureMode;
+               }
+
+               public Config setSecureMode(boolean secureMode) {
+                       this.secureMode = secureMode;
+                       return this;
+               }
+       }
 
        protected static final String KAFKA_HOST = "localhost";
 
-       public abstract void prepare(int numKafkaServers, Properties 
kafkaServerProperties, boolean secureMode);
-
-       public void prepare(int numberOfKafkaServers, boolean secureMode) {
-               this.prepare(numberOfKafkaServers, null, secureMode);
+       public static Config createConfig() {
+               return new Config();
        }
 
+       public abstract void prepare(Config config);
+
        public abstract void shutdown();
 
        public abstract void deleteTestTopic(String topic);

Reply via email to