[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-14 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/589#issuecomment-92834090
  
I would like to merge this PR rather soon because I need some of the 
changes from this PR for adding another KafkaITCase.

Is it okay to postpone the refactor to the `TestStreamEnvironment` in a 
separate PR?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-14 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/589#issuecomment-92846151
  
Yes, of course, we can postpone the refactor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-14 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/589


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-14 Thread rmetzger
Github user rmetzger commented on the pull request:

https://github.com/apache/flink/pull/589#issuecomment-92902901
  
I'm merging the PR now


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28257271
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 ---
@@ -524,13 +537,149 @@ public void cancel() {
}
}
 
+   private static boolean leaderHasShutDown = false;
+
+   @Test
+   public void brokerFailureTest() throws Exception {
+   String topic = brokerFailureTestTopic;
+
+   createTestTopic(topic, 2, 2);
 
-   private void createTestTopic(String topic, int numberOfPartitions) {
KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-   kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+   final String leaderToShutDown =
+   
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
+
+   final Thread brokerShutdown = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   shutdownKafkaBroker = false;
+   while (!shutdownKafkaBroker) {
+   try {
+   Thread.sleep(10);
+   } catch (InterruptedException e) {
+   LOG.warn(Interruption, e);
+   }
+   }
+
+   for (KafkaServer kafkaServer : brokers) {
+   if (leaderToShutDown.equals(
+   
kafkaServer.config().advertisedHostName()
+   + :
+   + 
kafkaServer.config().advertisedPort()
+   )) {
+   LOG.info(Killing Kafka Server 
{}, leaderToShutDown);
+   kafkaServer.shutdown();
+   leaderHasShutDown = true;
+   break;
+   }
+   }
+   }
+   });
+   brokerShutdown.start();
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
--- End diff --

You can add classes from other module's test/ directory as a maven 
dependency with this mechanism:

https://maven.apache.org/plugins/maven-jar-plugin/examples/create-test-jar.html



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227428
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTopicUtilsTest.java
 ---
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the License); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an AS IS BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.curator.test.TestingServer;
+import org.apache.flink.runtime.net.NetUtils;
+import 
org.apache.flink.streaming.connectors.kafka.api.simple.KafkaTopicUtils;
+import 
org.apache.flink.streaming.connectors.kafka.util.KafkaLocalSystemTime;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import kafka.api.PartitionMetadata;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+
+public class KafkaTopicUtilsTest {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(KafkaTopicUtilsTest.class);
+   private static final int NUMBER_OF_BROKERS = 2;
+   private static final String TOPIC = myTopic;
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
+   @Test
+   public void test() {
+   int zkPort;
+   String kafkaHost;
+   String zookeeperConnectionString;
+
+   File tmpZkDir;
+   ListFile tmpKafkaDirs;
+   MapString, KafkaServer kafkaServers = null;
+   TestingServer zookeeper = null;
+
+   try {
+   tmpZkDir = tempFolder.newFolder();
+
+   tmpKafkaDirs = new ArrayListFile(NUMBER_OF_BROKERS);
+   for (int i = 0; i  NUMBER_OF_BROKERS; i++) {
+   tmpKafkaDirs.add(tempFolder.newFolder());
+   }
+
+   zkPort = NetUtils.getAvailablePort();
+   kafkaHost = InetAddress.getLocalHost().getHostName();
+   zookeeperConnectionString = localhost: + zkPort;
+
+   // init zookeeper
+   zookeeper = new TestingServer(zkPort, tmpZkDir);
+
+   // init kafka kafkaServers
+   kafkaServers = new HashMapString, KafkaServer();
+
+   for (int i = 0; i  NUMBER_OF_BROKERS; i++) {
+   KafkaServer kafkaServer = 
getKafkaServer(kafkaHost, zookeeperConnectionString, i, tmpKafkaDirs.get(i));
+   
kafkaServers.put(kafkaServer.config().advertisedHostName() + : + 
kafkaServer.config().advertisedPort(), kafkaServer);
+   }
+
+   // create Kafka topic
+   final KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
+   kafkaTopicUtils.createTopic(TOPIC, 1, 2);
+
+   // check whether topic exists
+   assertTrue(kafkaTopicUtils.topicExists(TOPIC));
+
+   // check number of partitions
+   assertEquals(1, 
kafkaTopicUtils.getNumberOfPartitions(TOPIC));
+
+   // get partition metadata without error
+   PartitionMetadata partitionMetadata = 
kafkaTopicUtils.waitAndGetPartitionMetadata(TOPIC, 0);
+   assertEquals(0, partitionMetadata.errorCode());
+
+   // get broker list
+   

[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227273
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/iterator/KafkaSinglePartitionIterator.java
 ---
@@ -111,18 +128,38 @@ public void initialize() throws InterruptedException {
} while (metadata == null);
 
if (metadata.leader() == null) {
-   throw new RuntimeException(Can't find Leader for Topic 
and Partition. (at  + hosts.get(0)
-   + : + port);
+   throw new RuntimeException(Can't find Leader for Topic 
and Partition. (at  + hosts + ));
}
 
-   leadBroker = metadata.leader().host();
+   leadBroker = metadata.leader();
clientName = Client_ + topic + _ + partition;
 
-   consumer = new SimpleConsumer(leadBroker, port, 
connectTimeoutMs, bufferSize, clientName);
+   consumer = new SimpleConsumer(leadBroker.host(), 
leadBroker.port(), connectTimeoutMs, bufferSize, clientName);
+
+   try {
+   readOffset = initialOffset.getOffset(consumer, topic, 
partition, clientName);
+   } catch (NotLeaderForPartitionException e) {
+   do {
+
+   metadata = findLeader(hosts, topic, partition);
 
-   readOffset = initialOffset.getOffset(consumer, topic, 
partition, clientName);
+   try {
+   
Thread.sleep(DEFAULT_WAIT_ON_EMPTY_FETCH);
+   } catch (InterruptedException ie) {
+   throw new 
InterruptedException(Establishing connection to Kafka failed);
+   }
+   } while (metadata == null);
--- End diff --

This loop also might run forever


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread mbalassi
Github user mbalassi commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227590
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 ---
@@ -524,13 +537,149 @@ public void cancel() {
}
}
 
+   private static boolean leaderHasShutDown = false;
+
+   @Test
+   public void brokerFailureTest() throws Exception {
+   String topic = brokerFailureTestTopic;
+
+   createTestTopic(topic, 2, 2);
 
-   private void createTestTopic(String topic, int numberOfPartitions) {
KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-   kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+   final String leaderToShutDown =
+   
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
+
+   final Thread brokerShutdown = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   shutdownKafkaBroker = false;
+   while (!shutdownKafkaBroker) {
+   try {
+   Thread.sleep(10);
+   } catch (InterruptedException e) {
+   LOG.warn(Interruption, e);
+   }
+   }
+
+   for (KafkaServer kafkaServer : brokers) {
+   if (leaderToShutDown.equals(
+   
kafkaServer.config().advertisedHostName()
+   + :
+   + 
kafkaServer.config().advertisedPort()
+   )) {
+   LOG.info(Killing Kafka Server 
{}, leaderToShutDown);
+   kafkaServer.shutdown();
+   leaderHasShutDown = true;
+   break;
+   }
+   }
+   }
+   });
+   brokerShutdown.start();
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
--- End diff --

I would use `TestSreamEnvironment` instead.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227565
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/KafkaSink.java
 ---
@@ -114,17 +132,24 @@ public KafkaSink(String zookeeperAddress, String 
topicId,
public void open(Configuration configuration) {
 
KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperAddress);
-   String brokerAddress = 
kafkaTopicUtils.getLeaderBrokerAddressForTopic(topicId);
+   String listOfBrokers = kafkaTopicUtils.getBrokerList(topicId);
+
+   if (LOG.isInfoEnabled()) {
+   LOG.info(Broker list: {}, listOfBrokers);
+   }
 
-   props = new Properties();
+   if (props == null) {
--- End diff --

How about always setting our default properties (broker list, max retries 
etc.) and overwrite them with the properties passed by the users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227177
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
 ---
@@ -65,36 +75,145 @@ public void createTopic(String topicName, int 
numOfPartitions, int replicationFa
LOG.warn(Kafka topic \{}\ already exists. 
Returning without action., topicName);
}
} else {
+   LOG.info(Connecting zookeeper);
+
+   initZkClient();
AdminUtils.createTopic(zkClient, topicName, 
numOfPartitions, replicationFactor, topicConfig);
+   closeZkClient();
+   }
+   }
+
+   public String getBrokerList(String topicName) {
+   return getBrokerAddressList(getBrokerAddresses(topicName));
+   }
+
+   public String getBrokerList(String topicName, int partitionId) {
+   return getBrokerAddressList(getBrokerAddresses(topicName, 
partitionId));
+   }
+
+   public SetString getBrokerAddresses(String topicName) {
+   int numOfPartitions = getNumberOfPartitions(topicName);
+
+   HashSetString brokers = new HashSetString();
+   for (int i = 0; i  numOfPartitions; i++) {
+   brokers.addAll(getBrokerAddresses(topicName, i));
}
+   return brokers;
+   }
+
+   public SetString getBrokerAddresses(String topicName, int 
partitionId) {
+   PartitionMetadata partitionMetadata = 
waitAndGetPartitionMetadata(topicName, partitionId);
+   CollectionBroker inSyncReplicas = 
JavaConversions.asJavaCollection(partitionMetadata.isr());
+
+   HashSetString addresses = new HashSetString();
+   for (Broker broker : inSyncReplicas) {
+   addresses.add(broker.connectionString());
+   }
+   return addresses;
+   }
+
+   private static String getBrokerAddressList(SetString brokerAddresses) 
{
+   StringBuilder brokerAddressList = new StringBuilder();
+   for (String broker : brokerAddresses) {
+   brokerAddressList.append(broker);
+   brokerAddressList.append(',');
+   }
+   brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
+
+   return brokerAddressList.toString();
}
 
public int getNumberOfPartitions(String topicName) {
-   SeqPartitionMetadata partitionMetadataSeq = 
getTopicInfo(topicName).partitionsMetadata();
+   SeqPartitionMetadata partitionMetadataSeq = 
getTopicMetadata(topicName).partitionsMetadata();
return 
JavaConversions.asJavaCollection(partitionMetadataSeq).size();
}
 
-   public String getLeaderBrokerAddressForTopic(String topicName) {
-   TopicMetadata topicInfo = getTopicInfo(topicName);
+   public PartitionMetadata waitAndGetPartitionMetadata(String topicName, 
int partitionId) {
+   PartitionMetadata partitionMetadata;
+   while (true) {
+   try {
+   partitionMetadata = 
getPartitionMetadata(topicName, partitionId);
+   return partitionMetadata;
+   } catch (LeaderNotAvailableException e) {
+   // try fetching metadata again
--- End diff --

I would suggest to LOG.debug the exception


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread rmetzger
Github user rmetzger commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28227162
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/main/java/org/apache/flink/streaming/connectors/kafka/api/simple/KafkaTopicUtils.java
 ---
@@ -65,36 +75,145 @@ public void createTopic(String topicName, int 
numOfPartitions, int replicationFa
LOG.warn(Kafka topic \{}\ already exists. 
Returning without action., topicName);
}
} else {
+   LOG.info(Connecting zookeeper);
+
+   initZkClient();
AdminUtils.createTopic(zkClient, topicName, 
numOfPartitions, replicationFactor, topicConfig);
+   closeZkClient();
+   }
+   }
+
+   public String getBrokerList(String topicName) {
+   return getBrokerAddressList(getBrokerAddresses(topicName));
+   }
+
+   public String getBrokerList(String topicName, int partitionId) {
+   return getBrokerAddressList(getBrokerAddresses(topicName, 
partitionId));
+   }
+
+   public SetString getBrokerAddresses(String topicName) {
+   int numOfPartitions = getNumberOfPartitions(topicName);
+
+   HashSetString brokers = new HashSetString();
+   for (int i = 0; i  numOfPartitions; i++) {
+   brokers.addAll(getBrokerAddresses(topicName, i));
}
+   return brokers;
+   }
+
+   public SetString getBrokerAddresses(String topicName, int 
partitionId) {
+   PartitionMetadata partitionMetadata = 
waitAndGetPartitionMetadata(topicName, partitionId);
+   CollectionBroker inSyncReplicas = 
JavaConversions.asJavaCollection(partitionMetadata.isr());
+
+   HashSetString addresses = new HashSetString();
+   for (Broker broker : inSyncReplicas) {
+   addresses.add(broker.connectionString());
+   }
+   return addresses;
+   }
+
+   private static String getBrokerAddressList(SetString brokerAddresses) 
{
+   StringBuilder brokerAddressList = new StringBuilder();
+   for (String broker : brokerAddresses) {
+   brokerAddressList.append(broker);
+   brokerAddressList.append(',');
+   }
+   brokerAddressList.deleteCharAt(brokerAddressList.length() - 1);
+
+   return brokerAddressList.toString();
}
 
public int getNumberOfPartitions(String topicName) {
-   SeqPartitionMetadata partitionMetadataSeq = 
getTopicInfo(topicName).partitionsMetadata();
+   SeqPartitionMetadata partitionMetadataSeq = 
getTopicMetadata(topicName).partitionsMetadata();
return 
JavaConversions.asJavaCollection(partitionMetadataSeq).size();
}
 
-   public String getLeaderBrokerAddressForTopic(String topicName) {
-   TopicMetadata topicInfo = getTopicInfo(topicName);
+   public PartitionMetadata waitAndGetPartitionMetadata(String topicName, 
int partitionId) {
+   PartitionMetadata partitionMetadata;
+   while (true) {
+   try {
+   partitionMetadata = 
getPartitionMetadata(topicName, partitionId);
+   return partitionMetadata;
+   } catch (LeaderNotAvailableException e) {
+   // try fetching metadata again
+   }
--- End diff --

We might end up in an infinitive loop here.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread gaborhermann
Github user gaborhermann commented on the pull request:

https://github.com/apache/flink/pull/589#issuecomment-92413840
  
Thanks for the comments, I made the proposed changes (except Marton's).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request: [FLINK-1753] Added Kafka connectors test and c...

2015-04-13 Thread gaborhermann
Github user gaborhermann commented on a diff in the pull request:

https://github.com/apache/flink/pull/589#discussion_r28251514
  
--- Diff: 
flink-staging/flink-streaming/flink-streaming-connectors/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java
 ---
@@ -524,13 +537,149 @@ public void cancel() {
}
}
 
+   private static boolean leaderHasShutDown = false;
+
+   @Test
+   public void brokerFailureTest() throws Exception {
+   String topic = brokerFailureTestTopic;
+
+   createTestTopic(topic, 2, 2);
 
-   private void createTestTopic(String topic, int numberOfPartitions) {
KafkaTopicUtils kafkaTopicUtils = new 
KafkaTopicUtils(zookeeperConnectionString);
-   kafkaTopicUtils.createTopic(topic, numberOfPartitions, 1);
+   final String leaderToShutDown =
+   
kafkaTopicUtils.waitAndGetPartitionMetadata(topic, 
0).leader().get().connectionString();
+
+   final Thread brokerShutdown = new Thread(new Runnable() {
+   @Override
+   public void run() {
+   shutdownKafkaBroker = false;
+   while (!shutdownKafkaBroker) {
+   try {
+   Thread.sleep(10);
+   } catch (InterruptedException e) {
+   LOG.warn(Interruption, e);
+   }
+   }
+
+   for (KafkaServer kafkaServer : brokers) {
+   if (leaderToShutDown.equals(
+   
kafkaServer.config().advertisedHostName()
+   + :
+   + 
kafkaServer.config().advertisedPort()
+   )) {
+   LOG.info(Killing Kafka Server 
{}, leaderToShutDown);
+   kafkaServer.shutdown();
+   leaderHasShutDown = true;
+   break;
+   }
+   }
+   }
+   });
+   brokerShutdown.start();
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.createLocalEnvironment(1);
+
--- End diff --

I could not reach TestStreamEnvironment from flink-streaming-connectors. 
Should the TestStreamEnvironment be migrated from the test directory to the 
main?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---