[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408706#comment-16408706
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5729


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16408010#comment-16408010
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/5729
  
Thanks!


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407853#comment-16407853
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5729
  
merging.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-21 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407737#comment-16407737
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5729
  
Sweet, nice to see this fixed.

Code looks good, +1 to merge!


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406282#comment-16406282
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5729

[FLINK-7343][kafka-tests] Fix test at-least-once test instability

This pr fixes instabilities in both Kafka 0.10 and Kafka 0.9.

Previously we could set lastSnapshotedElement to some value during 
checkpointing AFTER executing shutdown while KafkaProducer snapshot of this 
value would fail. This was leading to incorrectly expect this value to be 
present in the test kafka topic. Fix is to remember 
lastSnapshotedElementBeforeShutdown - last snapshot that we exepct to succeed 
without failure.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f7343

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5729.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5729






> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118105#comment-16118105
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski closed the pull request at:

https://github.com/apache/flink/pull/4470


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118104#comment-16118104
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4470
  
Thanks :)


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-08 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16118019#comment-16118019
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4470
  
Please close the commit and the Jira issue.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1611#comment-1611
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StefanRRichter commented on the issue:

https://github.com/apache/flink/pull/4470
  
Merging.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116655#comment-16116655
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4470
  
Yes I also didn't like adding this new flag, but didn't have enough 
motivation to change it. I have done some refactoring extracting those 
dynamically set in `prepare` method to some `Config` class. 

However it helps only a little bit. Those tests would need a more 
comprehensive refactor in the future. I particularly don't like that this 
`prepare` method exists, it should all be configured in the constructor and all 
of those should be final fields.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116568#comment-16116568
 ] 

Chesnay Schepler commented on FLINK-7343:
-

Xmx increase in 1.4 in 4406d4868320c72ce7c744748cbd7528ff4bc642

> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116559#comment-16116559
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4456


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116547#comment-16116547
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131648638
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   

[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116539#comment-16116539
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4456
  
Thanks :)


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116399#comment-16116399
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4456
  
merging.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116359#comment-16116359
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131608304
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   

[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116358#comment-16116358
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131609166
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   

[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116357#comment-16116357
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/4470#discussion_r131608115
  
--- Diff: 
flink-test-utils-parent/flink-test-utils/src/main/java/org/apache/flink/networking/NetworkFailureHandler.java
 ---
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.networking;
+
+import org.jboss.netty.bootstrap.ClientBootstrap;
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import java.net.InetSocketAddress;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.function.Consumer;
+
+/**
+ * Handler that is forwarding inbound traffic from the source channel to 
the target channel on remoteHost:remotePort
+ * and the responses in the opposite direction. All of the network traffic 
can be blocked at any time using blocked
+ * flag.
+ */
+class NetworkFailureHandler extends SimpleChannelUpstreamHandler {
+   private static final String TARGET_CHANNEL_HANDLER_NAME = 
"target_channel_handler";
+
+   // mapping between source and target channels, used for finding correct 
target channel to use for given source.
+   private final Map sourceToTargetChannels = new 
ConcurrentHashMap<>();
+   private final Consumer onClose;
+   private final ClientSocketChannelFactory channelFactory;
+   private final String remoteHost;
+   private final int remotePort;
+
+   private final AtomicBoolean blocked;
+
+   public NetworkFailureHandler(
+   AtomicBoolean blocked,
+   Consumer onClose,
+   ClientSocketChannelFactory channelFactory,
+   String remoteHost,
+   int remotePort) {
+   this.blocked = blocked;
+   this.onClose = onClose;
+   this.channelFactory = channelFactory;
+   this.remoteHost = remoteHost;
+   this.remotePort = remotePort;
+   }
+
+   /**
+* Closes the specified channel after all queued write requests are 
flushed.
+*/
+   static void closeOnFlush(Channel channel) {
+   if (channel.isConnected()) {
+   
channel.write(ChannelBuffers.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
+   }
+   }
+
+   public void closeConnections() {
+   for (Map.Entry entry : 
sourceToTargetChannels.entrySet()) {
+   // target channel is closed on source's channel 
channelClosed even
+   entry.getKey().close();
+   }
+   }
+
+   @Override
+   public void channelOpen(ChannelHandlerContext context, 
ChannelStateEvent event) throws Exception {
+   // Suspend incoming traffic until connected to the remote host.
+   final Channel sourceChannel = event.getChannel();
+   sourceChannel.setReadable(false);
+
+   if (blocked.get()) {
+   sourceChannel.close();
+   return;
+   }
+
+   // Start the connection attempt.
+   

[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-07 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16116176#comment-16116176
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/4456
  
+1, LGTM.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112654#comment-16112654
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/4470

[FLINK-7343] Simulate network failures in kafka at-least-once test

We shouldn't fail KafkaServers directly, because they might not be able to 
flush the data (`log.flush.interval.***` properties). Since we don't want to 
test how well Kafka implements at-least-once/exactly-once semantic, it is a 
better idea (and hopefully more reliable) to just simulate network failure 
between Flink and Kafka in our at-least-once tests. To achieve that I have 
implemented `NetworkFailuresProxy` class.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink network-failures

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4470.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4470


commit 0e28327619893cfbf793fa842be3d965f649516c
Author: Piotr Nowojski 
Date:   2017-08-01T14:05:49Z

[FLINK-7343][kafka] Increase Xmx for tests

Sometimes 1000m was not enough memory to run at-least-once tests with 
broker failures on Travis

commit 8d820c3d0e77624a945e074f4a1bc476b5fd0f75
Author: Piotr Nowojski 
Date:   2017-08-01T16:11:27Z

[FLINK-7343] Add network proxy utility to simulate network failures

commit 967e1dfc87846b4011652bbaefab696900abc8dd
Author: Piotr Nowojski 
Date:   2017-08-03T07:25:04Z

fixup! [FLINK-7343][kafka] Increase Xmx for tests

commit 27b20f2ec3770231d95c3c7918c9313ce58b5e18
Author: Piotr Nowojski 
Date:   2017-08-03T09:27:12Z

[FLINK-7343] Use NetworkFailureProxy in kafka tests

We shouldn't fail KafkaServers directly, because they might not be able
to flush the data. Since we don't want to test how well Kafka implements
at-least-once/exactly-once semantic, we just simulate network failure
between Flink and Kafka in our at-least-once tests.

commit 692b5944f16b98aafe716ca1d18a04fa8a033798
Author: Piotr Nowojski 
Date:   2017-08-03T09:35:26Z

[hotfix][Kafka] Clean up getKafkaServer method




> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-03 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16112332#comment-16112332
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user pnowojski commented on the issue:

https://github.com/apache/flink/pull/4456
  
@NicoK sounds reasonable however this will drop `-UseGCOverheadLimit` flag. 
It should be fine when at the same time we increase the `Xmx`. Btw, having 
`UseGCOverheadLimit` seems a little bit fishy in the first place...

Pushing fixup with inheriting values from the parent pom.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7343) Kafka010ProducerITCase instability

2017-08-02 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16110577#comment-16110577
 ] 

ASF GitHub Bot commented on FLINK-7343:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4456
  
yes, seems like it was changed from `-Xmx800m` to `-Xmx1536m` by FLINK-6128 
and again to `-Xmx2048m` by FLINK-7004 without changing it here. The question 
should rather be whether we need the specialised options anyway or whether we 
can remove them.
I tested the latter (removing the `...` lines but 
keeping the `1`) and then the root pom's values are 
inherited which may be a better solution anyway.


> Kafka010ProducerITCase instability
> --
>
> Key: FLINK-7343
> URL: https://issues.apache.org/jira/browse/FLINK-7343
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Reporter: Piotr Nowojski
>Assignee: Piotr Nowojski
>  Labels: test-stability
>
> As reported by [~till.rohrmann] in 
> https://issues.apache.org/jira/browse/FLINK-6996 there seems to be a test 
> instability with 
> `Kafka010ProducerITCase>KafkaProducerTestBase.testOneToOneAtLeastOnceRegularSink`
> https://travis-ci.org/tillrohrmann/flink/jobs/258538641
> It is probably related to log.flush intervals in Kafka, which delay flushing 
> the data to files and potentially causing data loses on killing Kafka brokers 
> in the tests.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)