[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4470 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131648638 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targetConn
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131608115 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targe
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131609166 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targe
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131608304 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final Map sourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. + ClientBootstrap targetConnectionBootstrap = new ClientBootstrap(channelFactory); + targetConnectionBootstrap.getPipeline().addLast(TARGET_CHANNEL_HANDLER_NAME, new TargetChannelHandler(event.getChannel(), blocked)); + ChannelFuture connectFuture = targe
[GitHub] flink pull request #4470: [FLINK-7343] Simulate network failures in kafka at...
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4470 [FLINK-7343] Simulate network failures in kafka at-least-once test We shouldn't fail KafkaServers directly, because they might not be able to flush the data (`log.flush.interval.***` properties). Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, it is a better idea (and hopefully more reliable) to just simulate network failure between Flink and Kafka in our at-least-once tests. To achieve that I have implemented `NetworkFailuresProxy` class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-failures Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4470 commit 0e28327619893cfbf793fa842be3d965f649516c Author: Piotr Nowojski Date: 2017-08-01T14:05:49Z [FLINK-7343][kafka] Increase Xmx for tests Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75 Author: Piotr Nowojski Date: 2017-08-01T16:11:27Z [FLINK-7343] Add network proxy utility to simulate network failures commit 967e1dfc87846b4011652bbaefab696900abc8dd Author: Piotr Nowojski Date: 2017-08-03T07:25:04Z fixup! [FLINK-7343][kafka] Increase Xmx for tests commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18 Author: Piotr Nowojski Date: 2017-08-03T09:27:12Z [FLINK-7343] Use NetworkFailureProxy in kafka tests We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests. commit 692b5944f16b98aafe716ca1d18a04fa8a033798 Author: Piotr Nowojski Date: 2017-08-03T09:35:26Z [hotfix][Kafka] Clean up getKafkaServer method --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---