[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408706#comment-16408706 ] ASF GitHub Bot commented on FLINK-7343: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5729 > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408010#comment-16408010 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/5729 Thanks! > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407853#comment-16407853 ] ASF GitHub Bot commented on FLINK-7343: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/5729 merging. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407737#comment-16407737 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/5729 Sweet, nice to see this fixed. Code looks good, +1 to merge! > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406282#comment-16406282 ] ASF GitHub Bot commented on FLINK-7343: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5729 [FLINK-7343][kafka-tests] Fix test at-least-once test instability This pr fixes instabilities in both Kafka 0.10 and Kafka 0.9. Previously we could set lastSnapshotedElement to some value during checkpointing AFTER executing shutdown while KafkaProducer snapshot of this value would fail. This was leading to incorrectly expect this value to be present in the test kafka topic. Fix is to remember lastSnapshotedElementBeforeShutdown - last snapshot that we exepct to succeed without failure. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f7343 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5729.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5729 > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski >Priority: Blocker > Labels: test-stability > Fix For: 1.5.0 > > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118105#comment-16118105 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski closed the pull request at: https://github.com/apache/flink/pull/4470 > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118104#comment-16118104 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4470 Thanks :) > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118019#comment-16118019 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4470 Please close the commit and the Jira issue. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1611#comment-1611 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StefanRRichter commented on the issue: https://github.com/apache/flink/pull/4470 Merging. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116655#comment-16116655 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4470 Yes I also didn't like adding this new flag, but didn't have enough motivation to change it. I have done some refactoring extracting those dynamically set in `prepare` method to some `Config` class. However it helps only a little bit. Those tests would need a more comprehensive refactor in the future. I particularly don't like that this `prepare` method exists, it should all be configured in the constructor and all of those should be final fields. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116568#comment-16116568 ] Chesnay Schepler commented on FLINK-7343: - Xmx increase in 1.4 in 4406d4868320c72ce7c744748cbd7528ff4bc642 > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116559#comment-16116559 ] ASF GitHub Bot commented on FLINK-7343: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4456 > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116547#comment-16116547 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131648638 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final MapsourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. +
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116539#comment-16116539 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4456 Thanks :) > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116399#comment-16116399 ] ASF GitHub Bot commented on FLINK-7343: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4456 merging. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116359#comment-16116359 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131608304 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final MapsourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. +
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116358#comment-16116358 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131609166 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final MapsourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. +
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116357#comment-16116357 ] ASF GitHub Bot commented on FLINK-7343: --- Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/4470#discussion_r131608115 --- Diff: flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java --- @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.networking; + +import org.jboss.netty.bootstrap.ClientBootstrap; +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.ExceptionEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelUpstreamHandler; +import org.jboss.netty.channel.socket.ClientSocketChannelFactory; + +import java.net.InetSocketAddress; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; + +/** + * Handler that is forwarding inbound traffic from the source channel to the target channel on remoteHost:remotePort + * and the responses in the opposite direction. All of the network traffic can be blocked at any time using blocked + * flag. + */ +class NetworkFailureHandler extends SimpleChannelUpstreamHandler { + private static final String TARGET_CHANNEL_HANDLER_NAME = "target_channel_handler"; + + // mapping between source and target channels, used for finding correct target channel to use for given source. + private final MapsourceToTargetChannels = new ConcurrentHashMap<>(); + private final Consumer onClose; + private final ClientSocketChannelFactory channelFactory; + private final String remoteHost; + private final int remotePort; + + private final AtomicBoolean blocked; + + public NetworkFailureHandler( + AtomicBoolean blocked, + Consumer onClose, + ClientSocketChannelFactory channelFactory, + String remoteHost, + int remotePort) { + this.blocked = blocked; + this.onClose = onClose; + this.channelFactory = channelFactory; + this.remoteHost = remoteHost; + this.remotePort = remotePort; + } + + /** +* Closes the specified channel after all queued write requests are flushed. +*/ + static void closeOnFlush(Channel channel) { + if (channel.isConnected()) { + channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE); + } + } + + public void closeConnections() { + for (Map.Entry entry : sourceToTargetChannels.entrySet()) { + // target channel is closed on source's channel channelClosed even + entry.getKey().close(); + } + } + + @Override + public void channelOpen(ChannelHandlerContext context, ChannelStateEvent event) throws Exception { + // Suspend incoming traffic until connected to the remote host. + final Channel sourceChannel = event.getChannel(); + sourceChannel.setReadable(false); + + if (blocked.get()) { + sourceChannel.close(); + return; + } + + // Start the connection attempt. +
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116176#comment-16116176 ] ASF GitHub Bot commented on FLINK-7343: --- Github user tzulitai commented on the issue: https://github.com/apache/flink/pull/4456 +1, LGTM. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112654#comment-16112654 ] ASF GitHub Bot commented on FLINK-7343: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/4470 [FLINK-7343] Simulate network failures in kafka at-least-once test We shouldn't fail KafkaServers directly, because they might not be able to flush the data (`log.flush.interval.***` properties). Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, it is a better idea (and hopefully more reliable) to just simulate network failure between Flink and Kafka in our at-least-once tests. To achieve that I have implemented `NetworkFailuresProxy` class. You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink network-failures Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4470.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4470 commit 0e28327619893cfbf793fa842be3d965f649516c Author: Piotr NowojskiDate: 2017-08-01T14:05:49Z [FLINK-7343][kafka] Increase Xmx for tests Sometimes 1000m was not enough memory to run at-least-once tests with broker failures on Travis commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75 Author: Piotr Nowojski Date: 2017-08-01T16:11:27Z [FLINK-7343] Add network proxy utility to simulate network failures commit 967e1dfc87846b4011652bbaefab696900abc8dd Author: Piotr Nowojski Date: 2017-08-03T07:25:04Z fixup! [FLINK-7343][kafka] Increase Xmx for tests commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18 Author: Piotr Nowojski Date: 2017-08-03T09:27:12Z [FLINK-7343] Use NetworkFailureProxy in kafka tests We shouldn't fail KafkaServers directly, because they might not be able to flush the data. Since we don't want to test how well Kafka implements at-least-once/exactly-once semantic, we just simulate network failure between Flink and Kafka in our at-least-once tests. commit 692b5944f16b98aafe716ca1d18a04fa8a033798 Author: Piotr Nowojski Date: 2017-08-03T09:35:26Z [hotfix][Kafka] Clean up getKafkaServer method > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112332#comment-16112332 ] ASF GitHub Bot commented on FLINK-7343: --- Github user pnowojski commented on the issue: https://github.com/apache/flink/pull/4456 @NicoK sounds reasonable however this will drop `-UseGCOverheadLimit` flag. It should be fine when at the same time we increase the `Xmx`. Btw, having `UseGCOverheadLimit` seems a little bit fishy in the first place... Pushing fixup with inheriting values from the parent pom. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability
[ https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110577#comment-16110577 ] ASF GitHub Bot commented on FLINK-7343: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4456 yes, seems like it was changed from `-Xmx800m` to `-Xmx1536m` by FLINK-6128 and again to `-Xmx2048m` by FLINK-7004 without changing it here. The question should rather be whether we need the specialised options anyway or whether we can remove them. I tested the latter (removing the `...` lines but keeping the `1`) and then the root pom's values are inherited which may be a better solution anyway. > Kafka010ProducerITCase instability > -- > > Key: FLINK-7343 > URL: https://issues.apache.org/jira/browse/FLINK-7343 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Reporter: Piotr Nowojski >Assignee: Piotr Nowojski > Labels: test-stability > > As reported by [~till.rohrmann] in > https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test > instability with > `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink` > https://travis-ci.org/tillrohrmann/flink/jobs/258538641 > It is probably related to log.flush intervals in Kafka, which delay flushing > the data to files and potentially causing data loses on killing Kafka brokers > in the tests. -- This message was sent by Atlassian JIRA (v6.4.14#64029)