[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-08 Thread pnowojski
Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4470


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---


[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-07 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131648638
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
+   
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
+   ChannelFuture connectFuture = 
targetConn

[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-07 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131608115
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
+   
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
+   ChannelFuture connectFuture = 
targe

[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-07 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131609166
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
+   
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
+   ChannelFuture connectFuture = 
targe

[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-07 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131608304
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   ClientBootstrap targetConnectionBootstrap = new 
ClientBootstrap(channelFactory);
+   
targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, 
new TargetChannelHandler(event.getChannel(), blocked));
+   ChannelFuture connectFuture = 
targe

[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...

2017-08-03 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4470

[FLINK-7343] Simulate network failures in kafka at-least-once test

We shouldn't fail KafkaServers directly, because they might not be able to 
flush the data (`log.flush.interval.***` properties). Since we don't want to 
test how well Kafka implements at-least-once/exactly-once semantic, it is a 
better idea (and hopefully more reliable) to just simulate network failure 
between Flink and Kafka in our at-least-once tests. To achieve that I have 
implemented `NetworkFailuresProxy` class.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink network-failures

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4470


commit 0e28327619893cfbf793fa842be3d965f649516c
Author: Piotr Nowojski 
Date:   2017-08-01T14:05:49Z

[FLINK-7343][kafka] Increase Xmx for tests

Sometimes 1000m was not enough memory to run at-least-once tests with 
broker failures on Travis

commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75
Author: Piotr Nowojski 
Date:   2017-08-01T16:11:27Z

[FLINK-7343] Add network proxy utility to simulate network failures

commit 967e1dfc87846b4011652bbaefab696900abc8dd
Author: Piotr Nowojski 
Date:   2017-08-03T07:25:04Z

fixup! [FLINK-7343][kafka] Increase Xmx for tests

commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18
Author: Piotr Nowojski 
Date:   2017-08-03T09:27:12Z

[FLINK-7343] Use NetworkFailureProxy in kafka tests

We shouldn't fail KafkaServers directly, because they might not be able
to flush the data. Since we don't want to test how well Kafka implements
at-least-once/exactly-once semantic, we just simulate network failure
between Flink and Kafka in our at-least-once tests.

commit 692b5944f16b98aafe716ca1d18a04fa8a033798
Author: Piotr Nowojski 
Date:   2017-08-03T09:35:26Z

[hotfix][Kafka] Clean up getKafkaServer method




---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---