[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-02 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567824#comment-16567824
 ] 

Zhenqiu Huang commented on FLINK-7243:
--

[~najman] [~nssalian]

Created a pull request [https://github.com/apache/flink/pull/6483.] Please 
review it when you have time.

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567823#comment-16567823
 ] 

ASF GitHub Bot commented on FLINK-10020:


tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] 
Support recoverable exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482#discussion_r207447268
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -433,6 +440,16 @@ private ListShardsResult listShards(String streamName, 
@Nullable String startSha
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. 
Reusing the previous state.");
break;
+   } catch (SdkClientException ex) {
+   if (isRecoverableSdkClientException(ex)) {
+   long backoffMillis = fullJitterBackoff(
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got SdkClientException when 
listing shards from stream {}. Backing off for {} millis.",
+   streamName, backoffMillis);
+   Thread.sleep(backoffMillis);
 
 Review comment:
   I'm wondering what kind of `SdkClientException`s there are. Do we really 
need to have a backoff here before retrying?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kinesis Consumer listShards should support more recoverable exceptions
> --
>
> Key: FLINK-10020
> URL: https://issues.apache.org/jira/browse/FLINK-10020
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Currently transient errors in listShards make the consumer fail and cause the 
> entire job to reset. That is unnecessary for certain exceptions (like status 
> 503 errors). It should be possible to control the exceptions that qualify for 
> retry, similar to getRecords/isRecoverableSdkClientException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu commented on issue #6483: [Flink-7243][flink-formats] Add parquet input format

2018-08-02 Thread GitBox
HuangZhenQiu commented on issue #6483: [Flink-7243][flink-formats] Add parquet 
input format
URL: https://github.com/apache/flink/pull/6483#issuecomment-410152524
 
 
   @suez1224  Would you please have a look this PR?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

2018-08-02 Thread GitBox
tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] 
Support recoverable exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482#discussion_r207447268
 
 

 ##
 File path: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java
 ##
 @@ -433,6 +440,16 @@ private ListShardsResult listShards(String streamName, 
@Nullable String startSha
} catch (ExpiredNextTokenException expiredToken) {
LOG.warn("List Shards has an expired token. 
Reusing the previous state.");
break;
+   } catch (SdkClientException ex) {
+   if (isRecoverableSdkClientException(ex)) {
+   long backoffMillis = fullJitterBackoff(
+   listShardsBaseBackoffMillis, 
listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++);
+   LOG.warn("Got SdkClientException when 
listing shards from stream {}. Backing off for {} millis.",
+   streamName, backoffMillis);
+   Thread.sleep(backoffMillis);
 
 Review comment:
   I'm wondering what kind of `SdkClientException`s there are. Do we really 
need to have a backoff here before retrying?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9861) Add end-to-end test for reworked BucketingSink

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567814#comment-16567814
 ] 

ASF GitHub Bot commented on FLINK-9861:
---

tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add 
StreamingFileSink E2E test 
URL: https://github.com/apache/flink/pull/6478#discussion_r207438658
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test program for the {@link StreamingFileSink}.
+ *
+ * Uses a source that steadily emits a deterministic set of records over 60 
seconds,
+ * after which it idles and waits for job cancellation. Every record has a 
unique index that is
+ * written to the file.
+ *
+ * The sink rolls on each checkpoint, with each part file containing a 
sequence of integers.
+ * Adding all committed part files together, and numerically sorting the 
contents, should
+ * result in a complete sequence from 0 (inclusive) to 6 (exclusive).
+ */
+public enum StreamingFileSinkProgram {
+   ;
+
+   public static void main(final String[] args) throws Exception {
+   final ParameterTool params = ParameterTool.fromArgs(args);
+   final String outputPath = params.getRequired("outputPath");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(4);
+   env.enableCheckpointing(5000L);
+   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
Time.of(10L, TimeUnit.SECONDS)));
+
+   final StreamingFileSink> sink = 
StreamingFileSink
+   .forRowFormat(new Path(outputPath), 
(Encoder>) (element, stream) -> {
+   PrintStream out = new PrintStream(stream);
+   out.println(element.f1);
+   })
+   .withBucketer(new KeyBucketer())
+   .withRollingPolicy(new OnCheckpointRollingPolicy<>())
+   .build();
+
+   // generate data, shuffle, sink
+   env.addSource(new Generator(10, 10, 60))
+   .keyBy(0)
+   .addSink(sink);
+
+   env.execute("StreamingFileSinkProgram");
+   }
+
+
+   /**
+* Use first field for buckets.
+*/
+   public static final class KeyBucketer implements 
Bucketer, String> {
+
+   private static final long serialVersionUID = 
987325769970523326L;
+
+   @Override
+   public String getBucketId(final Tuple2 
element, final Context context) {
+   return String.valueOf(element.f0);
+   }
+
+   @Override
+   public SimpleVersionedSerializer getSerializer() {
+   return SimpleVersionedStringSerializer.INSTANCE;
+   }
+   }

[GitHub] tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test

2018-08-02 Thread GitBox
tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add 
StreamingFileSink E2E test 
URL: https://github.com/apache/flink/pull/6478#discussion_r207438658
 
 

 ##
 File path: 
flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
 ##
 @@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.Encoder;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer;
+import 
org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+
+import java.io.PrintStream;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Test program for the {@link StreamingFileSink}.
+ *
+ * Uses a source that steadily emits a deterministic set of records over 60 
seconds,
+ * after which it idles and waits for job cancellation. Every record has a 
unique index that is
+ * written to the file.
+ *
+ * The sink rolls on each checkpoint, with each part file containing a 
sequence of integers.
+ * Adding all committed part files together, and numerically sorting the 
contents, should
+ * result in a complete sequence from 0 (inclusive) to 6 (exclusive).
+ */
+public enum StreamingFileSinkProgram {
+   ;
+
+   public static void main(final String[] args) throws Exception {
+   final ParameterTool params = ParameterTool.fromArgs(args);
+   final String outputPath = params.getRequired("outputPath");
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(4);
+   env.enableCheckpointing(5000L);
+   env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 
Time.of(10L, TimeUnit.SECONDS)));
+
+   final StreamingFileSink> sink = 
StreamingFileSink
+   .forRowFormat(new Path(outputPath), 
(Encoder>) (element, stream) -> {
+   PrintStream out = new PrintStream(stream);
+   out.println(element.f1);
+   })
+   .withBucketer(new KeyBucketer())
+   .withRollingPolicy(new OnCheckpointRollingPolicy<>())
+   .build();
+
+   // generate data, shuffle, sink
+   env.addSource(new Generator(10, 10, 60))
+   .keyBy(0)
+   .addSink(sink);
+
+   env.execute("StreamingFileSinkProgram");
+   }
+
+
+   /**
+* Use first field for buckets.
+*/
+   public static final class KeyBucketer implements 
Bucketer, String> {
+
+   private static final long serialVersionUID = 
987325769970523326L;
+
+   @Override
+   public String getBucketId(final Tuple2 
element, final Context context) {
+   return String.valueOf(element.f0);
+   }
+
+   @Override
+   public SimpleVersionedSerializer getSerializer() {
+   return SimpleVersionedStringSerializer.INSTANCE;
+   }
+   }
+
+   /**
+* Data-generating source function.
+*/
+   public static final class Generator implements 
SourceFunction>, ListCheckpointed> {
+
+   private static final long serialVersionUID = 

[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567813#comment-16567813
 ] 

Xingcan Cui commented on FLINK-9977:


Hi [~twalthr] and [~fhueske], I've uploaded some screenshots of the new 
built-in function page.

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10039) FlinkKafkaProducer - Serializer Error

2018-08-02 Thread Akshay Nagpal (JIRA)
Akshay Nagpal created FLINK-10039:
-

 Summary: FlinkKafkaProducer - Serializer Error
 Key: FLINK-10039
 URL: https://issues.apache.org/jira/browse/FLINK-10039
 Project: Flink
  Issue Type: Bug
  Components: Streaming Connectors
Affects Versions: 1.4.2
Reporter: Akshay Nagpal


I am working on a use case where I input the data using Kafka's console 
producer, read the same data in my program using FlinkKafkaConsumer and write 
it back to another Kafka topic using FlinkKafkaProducer. 

I am using 1.4.2 version of the following dependencies:

flink-java

flink-streaming-java_2.11

flink-connector-kafka-0.10_2.11

 

The codes are as follows:

KafkaConsoleProducer:
{code:java}
./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property 
"parse.key=true" --property "key.separator=:" --key-serializer 
org.apache.kafka.common.serialization.StringSerializer --value-serializer 
org.apache.kafka.common.serialization.StringSerializer
{code}
KafkaFlinkConsumer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty("value.deserializer", 
"org.apache.kafka.common.serialization.StringDeserializer");


FlinkKafkaConsumer010 myConsumer = new 
FlinkKafkaConsumer010("test1", 
new SimpleStringSchema(),
properties);

DataStream stream = env.addSource(myConsumer);
{code}
KafkaFlinkProducer:
{code:java}
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "xxx:9092");
properties.setProperty("zookeeper.connect", "xxx:2181");
properties.setProperty("group.id", "test");
properties.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
properties1.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");


FlinkKafkaProducer010 myProducer = new 
FlinkKafkaProducer010("my-topic", 
new SimpleStringSchema(), 
properties);

stream.addSink(myProducer);
{code}
When I specify key and value serializer as StringSerializer in 
FlinkKafkaProducer, it gives me the following error in the logs:

 
{code:java}
org.apache.kafka.common.errors.SerializationException: Can't convert value of 
class [B to class org.apache.kafka.common.serialization.StringSerializer 
specified in value.serializer
{code}
Though it's giving me this error, it's still producing the data in the topic.

When I am using ByteArraySerializer though with the producer, it is not giving 
me the error in the logs. It is also giving me the output.

Moreover, DataStream's print method is not printing the data on the console.

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: Scala.jpg

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: Java.jpg

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: Java.jpg, SQL.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: SQL.jpg

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: SQL.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: (was: SQL.jpg)

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9875) Add concurrent creation of execution job vertex

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567803#comment-16567803
 ] 

ASF GitHub Bot commented on FLINK-9875:
---

TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent 
creation of execution job vertex
URL: https://github.com/apache/flink/pull/6353#issuecomment-410149936
 
 
   Further discussion about parallelizing the creation of InputSplit goes to 
[FLINK-10038](https://issues.apache.org/jira/browse/FLINK-10038)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add concurrent creation of execution job vertex
> ---
>
> Key: FLINK-9875
> URL: https://issues.apache.org/jira/browse/FLINK-9875
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime
>Affects Versions: 1.5.1
>Reporter: 陈梓立
>Assignee: 陈梓立
>Priority: Major
>  Labels: pull-request-available
>
> in some case like inputformat vertex, creation of execution job vertex is time
> consuming, this pr add concurrent creation of execution job vertex to 
> accelerate it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: (was: Java.jpg)

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: SQL.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: (was: Scala.jpg)

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: SQL.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex

2018-08-02 Thread GitBox
TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent 
creation of execution job vertex
URL: https://github.com/apache/flink/pull/6353#issuecomment-410149936
 
 
   Further discussion about parallelizing the creation of InputSplit goes to 
[FLINK-10038](https://issues.apache.org/jira/browse/FLINK-10038)


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions

2018-08-02 Thread Xingcan Cui (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Xingcan Cui updated FLINK-9977:
---
Attachment: SQL.jpg
Scala.jpg
Java.jpg

> Refine the docs for Table/SQL built-in functions
> 
>
> Key: FLINK-9977
> URL: https://issues.apache.org/jira/browse/FLINK-9977
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
> Attachments: Java.jpg, SQL.jpg, Scala.jpg
>
>
> There exist some syntax errors or inconsistencies in documents and Scala docs 
> of the Table/SQL built-in functions. This issue aims to make some 
> improvements to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] HuangZhenQiu opened a new pull request #6483: Add parquet input format

2018-08-02 Thread GitBox
HuangZhenQiu opened a new pull request #6483: Add parquet input format
URL: https://github.com/apache/flink/pull/6483
 
 
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

陈梓立 updated FLINK-10038:

Labels: improvement inputformat parallel perfomance  (was: )

> Parallel the creation of InputSplit if necessary
> 
>
> Key: FLINK-10038
> URL: https://issues.apache.org/jira/browse/FLINK-10038
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Priority: Major
>  Labels: improvement, inputformat, parallel, perfomance
>
> As a continue to the discussion in the PR about parallelize the creation of 
> ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].
> [~StephanEwen] suggested that we could parallelize the creation of 
> InputSplit, from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

陈梓立 updated FLINK-10038:

Component/s: (was: Build System)

> Parallel the creation of InputSplit if necessary
> 
>
> Key: FLINK-10038
> URL: https://issues.apache.org/jira/browse/FLINK-10038
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Priority: Major
>
> As a continue to the discussion in the PR about parallelize the creation of 
> ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].
> [~StephanEwen] suggested that we could parallelize the creation of 
> InputSplit, from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA


 [ 
https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

陈梓立 updated FLINK-10038:

Component/s: Build System

> Parallel the creation of InputSplit if necessary
> 
>
> Key: FLINK-10038
> URL: https://issues.apache.org/jira/browse/FLINK-10038
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Priority: Major
>
> As a continue to the discussion in the PR about parallelize the creation of 
> ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].
> [~StephanEwen] suggested that we could parallelize the creation of 
> InputSplit, from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA


[ 
https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567780#comment-16567780
 ] 

陈梓立 commented on FLINK-10038:
-

After taking a look of InputSplit and InputFormat, I find it that the interface 
for the creation of input splits is InputSplitSource#createInputSplits, whose 
implementations varies from FileInputFormat to JDBCInputFormat and so on.

Since we need to decide how to create input split in a specific input source, 
the parallelize logic is various inside the implementation, so implement the 
parallelize logic case by case if possible and necessary.

What about you guys' opinions? Are there other interfaces we need for the 
creation of input splits? What is the most elegant and effective way to do this 
parallelize and gain benefits from it you think?

Looking forward to your comments.

> Parallel the creation of InputSplit if necessary
> 
>
> Key: FLINK-10038
> URL: https://issues.apache.org/jira/browse/FLINK-10038
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: 陈梓立
>Priority: Major
>
> As a continue to the discussion in the PR about parallelize the creation of 
> ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].
> [~StephanEwen] suggested that we could parallelize the creation of 
> InputSplit, from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10038) Parallel the creation of InputSplit if necessary

2018-08-02 Thread JIRA
陈梓立 created FLINK-10038:
---

 Summary: Parallel the creation of InputSplit if necessary
 Key: FLINK-10038
 URL: https://issues.apache.org/jira/browse/FLINK-10038
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: 陈梓立


As a continue to the discussion in the PR about parallelize the creation of 
ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353].

[~StephanEwen] suggested that we could parallelize the creation of InputSplit, 
from which we gain performance improvements.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567778#comment-16567778
 ] 

ASF GitHub Bot commented on FLINK-9899:
---

glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433
 
 
   @zentol @tzulitai Sorry for the delay. Rebased the branch with latest master 
and added documentation for the metrics. Let me know if that looks alright!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer

2018-08-02 Thread GitBox
glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433
 
 
   @zentol @tzulitai Sorry for the delay. Rebased the branch with latest master 
and added documentation for the metrics. Let me know if that looks alright!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer

2018-08-02 Thread GitBox
glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433
 
 
   @tzulitai Sorry for the delay. Rebased the branch with latest master and 
added documentation for the metrics. Let me know if that looks alright!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1656#comment-1656
 ] 

ASF GitHub Bot commented on FLINK-9899:
---

glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add 
comprehensive per-shard metrics to ShardConsumer
URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433
 
 
   @tzulitai Sorry for the delay. Rebased the branch with latest master and 
added documentation for the metrics. Let me know if that looks alright!


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add more metrics to the Kinesis source connector
> 
>
> Key: FLINK-9899
> URL: https://issues.apache.org/jira/browse/FLINK-9899
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Affects Versions: 1.4.2, 1.5.1
>Reporter: Lakshmi Rao
>Assignee: Lakshmi Rao
>Priority: Major
>  Labels: pull-request-available
>
> Currently there are sparse metrics available for the Kinesis Connector. Using 
> the 
> [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java]
>  add more stats. For example:
> - sleepTimeMillis 
> - maxNumberOfRecordsPerFetch
> - numberOfAggregatedRecordsPerFetch
> - numberOfDeaggregatedRecordsPerFetch
> - bytesPerFetch
> - averageRecordSizeBytes
> - runLoopTimeNanos
> - loopFrequencyHz



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567769#comment-16567769
 ] 

ASF GitHub Bot commented on FLINK-10037:


TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time 
Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410143882
 
 
   cc @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document details event time behavior in a single location
> -
>
> Key: FLINK-10037
> URL: https://issues.apache.org/jira/browse/FLINK-10037
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.2
>Reporter: Elias Levy
>Assignee: Elias Levy
>Priority: Minor
>  Labels: pull-request-available
>
> A description of event time and watermarks, how they generated, assigned, and 
> handled, is spread across many pages in the documentation.  I would be useful 
> to have it all in a single place and includes missing information, such as 
> how Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page

2018-08-02 Thread GitBox
TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time 
Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410143882
 
 
   cc @zentol 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10037:
---
Labels: pull-request-available  (was: )

> Document details event time behavior in a single location
> -
>
> Key: FLINK-10037
> URL: https://issues.apache.org/jira/browse/FLINK-10037
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.2
>Reporter: Elias Levy
>Assignee: Elias Levy
>Priority: Minor
>  Labels: pull-request-available
>
> A description of event time and watermarks, how they generated, assigned, and 
> handled, is spread across many pages in the documentation.  I would be useful 
> to have it all in a single place and includes missing information, such as 
> how Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567726#comment-16567726
 ] 

ASF GitHub Bot commented on FLINK-10037:


eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event 
Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410135944
 
 
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document details event time behavior in a single location
> -
>
> Key: FLINK-10037
> URL: https://issues.apache.org/jira/browse/FLINK-10037
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.5.2
>Reporter: Elias Levy
>Assignee: Elias Levy
>Priority: Minor
>  Labels: pull-request-available
>
> A description of event time and watermarks, how they generated, assigned, and 
> handled, is spread across many pages in the documentation.  I would be useful 
> to have it all in a single place and includes missing information, such as 
> how Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page

2018-08-02 Thread GitBox
eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event 
Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410135944
 
 
   Done.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Comment Edited] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721
 ] 

buptljy edited comment on FLINK-10036 at 8/3/18 3:39 AM:
-

This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR for that issue these days. You can discuss with me if you have any questions.


was (Author: wind_ljy):
This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR these days. You can discuss with me if you have any questions.

> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field is not valid).
> h2. How am I going to fix it
> I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
> module, and add some test cases to ensure that my fix is correct.
> h2. What's affected
> This fix will change the output of CsvTableSink, and will affect the test 
> cases whose results are written to a CSV file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10037) Document details event time behavior in a single location

2018-08-02 Thread Elias Levy (JIRA)
Elias Levy created FLINK-10037:
--

 Summary: Document details event time behavior in a single location
 Key: FLINK-10037
 URL: https://issues.apache.org/jira/browse/FLINK-10037
 Project: Flink
  Issue Type: Bug
  Components: Documentation
Affects Versions: 1.5.2
Reporter: Elias Levy
Assignee: Elias Levy


A description of event time and watermarks, how they generated, assigned, and 
handled, is spread across many pages in the documentation.  I would be useful 
to have it all in a single place and includes missing information, such as how 
Flink assigns timestamps to new records generated by operators.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread buptljy (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721
 ] 

buptljy commented on FLINK-10036:
-

This is very similar to this 
[JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a 
PR these days. You can discuss with me if you have any questions.

> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field is not valid).
> h2. How am I going to fix it
> I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
> module, and add some test cases to ensure that my fix is correct.
> h2. What's affected
> This fix will change the output of CsvTableSink, and will affect the test 
> cases whose results are written to a CSV file.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread Caizhi Weng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-10036:

Description: 
h2. What's the problem

Flink's CSV output format is not consistent with the standard 
([https://tools.ietf.org/html/rfc4180]).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:
{noformat}
Hello,World
"Quoted" "String"
{noformat}
The CSV file should look like this:
{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}
But if we run the following Flink code to output these fields
{code:java}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}
We get the following CSV:
{noformat}
Hello,World,"Quoted" "String"
{noformat}
which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).
h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.
h2. What's affected

This fix will change the output of CsvTableSink, and will affect the test cases 
whose results are written to a CSV file.

  was:
h2. What's the problem

Flink's CSV output format is not consistent with the standard 
([https://tools.ietf.org/html/rfc4180]).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:
{noformat}
Hello,World
"Quoted" "String"
{noformat}
The CSV file should look like this:
{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}
But if we run the following Flink code to output these fields
{code}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}
We get the following CSV:
{noformat}
Hello,World,"Quoted" "String"
{noformat}
which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).
h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.
h2. What's affected

This fix will change the output of CsvTableSink, and will affect some test 
cases currently in the project.


> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code:java}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> 

[GitHub] TisonKun commented on issue #6481: Add Event Time Details documentation page

2018-08-02 Thread GitBox
TisonKun commented on issue #6481: Add Event Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410131208
 
 
   FYI, the travis-ci failure is irrelevant about this PR.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] TisonKun commented on issue #6481: Add Event Time Details documentation page

2018-08-02 Thread GitBox
TisonKun commented on issue #6481: Add Event Time Details documentation page
URL: https://github.com/apache/flink/pull/6481#issuecomment-410131107
 
 
   Hi @eliaslevy , thanks for you PR!
   It looks this PR is far more than a trivial work. Please take a look at the 
PULL_REQUEST_TEMPLATE  as your comment above. I think raise a JIRA issue assign 
to this is one step of the formal process to make this PR merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567700#comment-16567700
 ] 

ASF GitHub Bot commented on FLINK-9969:
---

TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207428623
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 ##
 @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List
TypeSerializerFactory serializerFactory, 
TypeComparator comparator,
int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
-   boolean objectReuseEnabled)
-   throws IOException
-   {
+   boolean objectReuseEnabled) throws IOException {
+   this (
+   memoryManager,
+   memory,
+   ioManager,
+   input,
+   parentTask,
+   serializerFactory,
+   comparator,
+   numSortBuffers,
+   maxNumFileHandles,
+   startSpillingFraction,
+   noSpillingMemory,
+   handleLargeRecords,
+   objectReuseEnabled,
+   new DefaultInMemorySorterFactory<>(serializerFactory, 
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+   }
+
+   protected UnilateralSortMerger(
 
 Review comment:
   why? we might config which `InMemorySorterFactory` to use if needed. 
semantically it's no need to be `@VisibleForTesting`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unreasonable memory requirements to complete examples/batch/WordCount
> -
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> examples/batch/WordCount.jar{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

2018-08-02 Thread GitBox
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207428623
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 ##
 @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List
TypeSerializerFactory serializerFactory, 
TypeComparator comparator,
int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
-   boolean objectReuseEnabled)
-   throws IOException
-   {
+   boolean objectReuseEnabled) throws IOException {
+   this (
+   memoryManager,
+   memory,
+   ioManager,
+   input,
+   parentTask,
+   serializerFactory,
+   comparator,
+   numSortBuffers,
+   maxNumFileHandles,
+   startSpillingFraction,
+   noSpillingMemory,
+   handleLargeRecords,
+   objectReuseEnabled,
+   new DefaultInMemorySorterFactory<>(serializerFactory, 
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+   }
+
+   protected UnilateralSortMerger(
 
 Review comment:
   why? we might config which `InMemorySorterFactory` to use if needed. 
semantically it's no need to be `@VisibleForTesting`


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567692#comment-16567692
 ] 

ASF GitHub Bot commented on FLINK-9969:
---

TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207425876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 ##
 @@ -47,9 +37,20 @@
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 
 Review comment:
   This change can be suppressed if merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unreasonable memory requirements to complete examples/batch/WordCount
> -
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> examples/batch/WordCount.jar{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

2018-08-02 Thread GitBox
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207425876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 ##
 @@ -47,9 +37,20 @@
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 
 Review comment:
   This change can be suppressed if merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567691#comment-16567691
 ] 

ASF GitHub Bot commented on FLINK-9969:
---

TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207425876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 ##
 @@ -47,9 +37,20 @@
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 
 Review comment:
   The change can be suppressed if merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unreasonable memory requirements to complete examples/batch/WordCount
> -
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> examples/batch/WordCount.jar{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

2018-08-02 Thread GitBox
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] 
Dispose InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207425876
 
 

 ##
 File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java
 ##
 @@ -47,9 +37,20 @@
 import org.apache.flink.runtime.util.ReusingKeyGroupedIterator;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.MutableObjectIterator;
+import org.apache.flink.util.TestLogger;
+
 import org.junit.After;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Hashtable;
+import java.util.Iterator;
+import java.util.NoSuchElementException;
 
 
 Review comment:
   The change can be suppressed if merged.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10033) Let Task release reference to Invokable on shutdown

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567687#comment-16567687
 ] 

ASF GitHub Bot commented on FLINK-10033:


TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases 
reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480#issuecomment-410124471
 
 
   nice catch! memory leaks is one of the most terrible issues, this is 
helpful. +1 for merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Let Task release reference to Invokable on shutdown
> ---
>
> Key: FLINK-10033
> URL: https://issues.apache.org/jira/browse/FLINK-10033
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> References to Task objects may under some conditions linger longer than for 
> the lifetime of the task. For example, in case of local network channels, the 
> receiving task may have a reference to the object of the task that produced 
> the data.
> To prevent against memory leaks, the Task needs to release all references to 
> its AbstractInvokable when it shuts down or cancels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable

2018-08-02 Thread GitBox
TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases 
reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480#issuecomment-410124471
 
 
   nice catch! memory leaks is one of the most terrible issues, this is 
helpful. +1 for merge


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread Caizhi Weng (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-10036:

Description: 
h2. What's the problem

Flink's CSV output format is not consistent with the standard 
([https://tools.ietf.org/html/rfc4180]).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:
{noformat}
Hello,World
"Quoted" "String"
{noformat}
The CSV file should look like this:
{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}
But if we run the following Flink code to output these fields
{code}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}
We get the following CSV:
{noformat}
Hello,World,"Quoted" "String"
{noformat}
which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).
h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.
h2. What's affected

This fix will change the output of CsvTableSink, and will affect some test 
cases currently in the project.

  was:
h2. What's the problem

Flink's CSV output format is not consistent with the standard 
(https://tools.ietf.org/html/rfc4180).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:

{noformat}
Hello,World
"Quoted" "String"
{noformat}

The CSV file should look like this:

{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}


But if we run the following Flink code to output these fields

{code:scala}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}

We get the following CSV:

{noformat}
Hello,World,"Quoted" "String"
{noformat}

which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).

h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.

But this fix will change the output of CsvTableSink, and will affect about 50 
test cases currently in the project.


> Flink's CSV output format is not consistent with the standard.
> --
>
> Key: FLINK-10036
> URL: https://issues.apache.org/jira/browse/FLINK-10036
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Reporter: Caizhi Weng
>Priority: Minor
>
> h2. What's the problem
> Flink's CSV output format is not consistent with the standard 
> ([https://tools.ietf.org/html/rfc4180]).
> In CSV format file, if a field contains comma, quotes or new line, this field 
> should be surrounded with quotes (see section 2.6 in the standard). 
> Specifically, if a field contains quotes, the quotes should be escaped by 
> double quotes (see section 2.7 in the standard).
> For example, to express these two fields in a CSV file:
> {noformat}
> Hello,World
> "Quoted" "String"
> {noformat}
> The CSV file should look like this:
> {noformat}
> "Hello,World","""Quoted"" ""String"""
> {noformat}
> But if we run the following Flink code to output these fields
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val tEnv = TableEnvironment.getTableEnvironment(env, config)
> val data = List(
>   ("Hello,World", "\"Quoted\" \"String\"")
> )
> val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
> ds.select('a, 'b)
> val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
> ds.writeToSink(sink)
> env.execute()
> {code}
> We get the following CSV:
> {noformat}
> Hello,World,"Quoted" "String"
> {noformat}
> which is not correct (there are actually 3 fields instead of 2 in this CSV 
> file, and the last field 

[jira] [Created] (FLINK-10036) Flink's CSV output format is not consistent with the standard.

2018-08-02 Thread Caizhi Weng (JIRA)
Caizhi Weng created FLINK-10036:
---

 Summary: Flink's CSV output format is not consistent with the 
standard.
 Key: FLINK-10036
 URL: https://issues.apache.org/jira/browse/FLINK-10036
 Project: Flink
  Issue Type: Bug
  Components: Table API  SQL
Reporter: Caizhi Weng


h2. What's the problem

Flink's CSV output format is not consistent with the standard 
(https://tools.ietf.org/html/rfc4180).

In CSV format file, if a field contains comma, quotes or new line, this field 
should be surrounded with quotes (see section 2.6 in the standard). 
Specifically, if a field contains quotes, the quotes should be escaped by 
double quotes (see section 2.7 in the standard).

For example, to express these two fields in a CSV file:

{noformat}
Hello,World
"Quoted" "String"
{noformat}

The CSV file should look like this:

{noformat}
"Hello,World","""Quoted"" ""String"""
{noformat}


But if we run the following Flink code to output these fields

{code:scala}
val env = ExecutionEnvironment.getExecutionEnvironment
val tEnv = TableEnvironment.getTableEnvironment(env, config)

val data = List(
  ("Hello,World", "\"Quoted\" \"String\"")
)
val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b)
ds.select('a, 'b)

val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE)
ds.writeToSink(sink)

env.execute()
{code}

We get the following CSV:

{noformat}
Hello,World,"Quoted" "String"
{noformat}

which is not correct (there are actually 3 fields instead of 2 in this CSV 
file, and the last field is not valid).

h2. How am I going to fix it

I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java 
module, and add some test cases to ensure that my fix is correct.

But this fix will change the output of CsvTableSink, and will affect about 50 
test cases currently in the project.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-5315) Support distinct aggregations in table api

2018-08-02 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567666#comment-16567666
 ] 

Rong Rong edited comment on FLINK-5315 at 8/3/18 1:25 AM:
--

[~hequn8128], [~fhueske] I am actually close to finish the implementation and 
found out that it might be better to use  
{code:scala}
myudagg('a, 'b).distinct
{code}
instead of 
{code:scala}
myudagg.distinct('a, 'b)
{code}

Since similar modifier is added at the end of the table distinct: 
{code:scala}
table.select('a, 'b).distinct
{code} 

It might be great to make the modifier location consistent. What do you guys 
think? 


was (Author: walterddr):
[~hequn8128], [~fhueske] I am actually close to finish the implementation and 
found out that it might be better to use  
{code:scala}
myudagg('a, 'b).distinct
{code}
instead of 
{code:scala}
myudagg.distinct('a, 'b)
{code}

Since similar modifier is added at the end of the table distinct: 
`table.select('a, 'b).distinct`. It might be great to make the modifier 
location consistent. What do you guys think? 

> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api

2018-08-02 Thread Rong Rong (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567666#comment-16567666
 ] 

Rong Rong commented on FLINK-5315:
--

[~hequn8128], [~fhueske] I am actually close to finish the implementation and 
found out that it might be better to use  
{code:scala}
myudagg('a, 'b).distinct
{code}
instead of 
{code:scala}
myudagg.distinct('a, 'b)
{code}

Since similar modifier is added at the end of the table distinct: 
`table.select('a, 'b).distinct`. It might be great to make the modifier 
location consistent. What do you guys think? 

> Support distinct aggregations in table api
> --
>
> Key: FLINK-5315
> URL: https://issues.apache.org/jira/browse/FLINK-5315
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Kurt Young
>Assignee: Rong Rong
>Priority: Major
>
> Support distinct aggregations in Table API in the following format:
> For Expressions:
> {code:scala}
> 'a.count.distinct // Expressions distinct modifier
> {code}
> For User-defined Function:
> {code:scala}
> singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier
> multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.

2018-08-02 Thread GitBox
tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support 
recoverable exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482
 
 
   This change fixes the retry behavior of listShards to match what getRecords 
already supports. Importantly this will prevent the subtask from failing on 
transient listShards errors that we can identify based on well known 
exceptions. These are recoverable and should not lead to unnecessary recovery 
cycles that cause downtime.
   
   R: @glaksh100 @jgrier @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567647#comment-16567647
 ] 

ASF GitHub Bot commented on FLINK-10020:


tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support 
recoverable exceptions in listShards.
URL: https://github.com/apache/flink/pull/6482
 
 
   This change fixes the retry behavior of listShards to match what getRecords 
already supports. Importantly this will prevent the subtask from failing on 
transient listShards errors that we can identify based on well known 
exceptions. These are recoverable and should not lead to unnecessary recovery 
cycles that cause downtime.
   
   R: @glaksh100 @jgrier @tzulitai 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Kinesis Consumer listShards should support more recoverable exceptions
> --
>
> Key: FLINK-10020
> URL: https://issues.apache.org/jira/browse/FLINK-10020
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Currently transient errors in listShards make the consumer fail and cause the 
> entire job to reset. That is unnecessary for certain exceptions (like status 
> 503 errors). It should be possible to control the exceptions that qualify for 
> retry, similar to getRecords/isRecoverableSdkClientException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions

2018-08-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10020:
---
Labels: pull-request-available  (was: )

> Kinesis Consumer listShards should support more recoverable exceptions
> --
>
> Key: FLINK-10020
> URL: https://issues.apache.org/jira/browse/FLINK-10020
> Project: Flink
>  Issue Type: Improvement
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
>  Labels: pull-request-available
>
> Currently transient errors in listShards make the consumer fail and cause the 
> entire job to reset. That is unnecessary for certain exceptions (like status 
> 503 errors). It should be possible to control the exceptions that qualify for 
> retry, similar to getRecords/isRecoverableSdkClientException.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] eliaslevy opened a new pull request #6481: Add Event Time Details documentation page

2018-08-02 Thread GitBox
eliaslevy opened a new pull request #6481: Add Event Time Details documentation 
page
URL: https://github.com/apache/flink/pull/6481
 
 
   *Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*
   
   *Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*
   
   ## Contribution Checklist
   
 - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
 
 - Name the pull request in the form "[FLINK-] [component] Title of the 
pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
 Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.
   
 - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
 
 - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).
   
 - Each pull request should address only one issue, not mix up code from 
multiple issues.
 
 - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)
   
 - Once all items of the checklist are addressed, remove the above text and 
this checklist, leaving only the filled out template below.
   
   
   **(The sections below can be removed for hotfixes of typos)**
   
   ## What is the purpose of the change
   
   *(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*
   
   
   ## Brief change log
   
   *(for example:)*
 - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
 - *Deployments RPC transmits only the blob storage reference*
 - *TaskManagers retrieve the TaskInfo from the blob cache*
   
   
   ## Verifying this change
   
   *(Please pick either of the following options)*
   
   This change is a trivial rework / code cleanup without any test coverage.
   
   *(or)*
   
   This change is already covered by existing tests, such as *(please describe 
tests)*.
   
   *(or)*
   
   This change added tests and can be verified as follows:
   
   *(example:)*
 - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
 - *Extended integration test for recovery after master (JobManager) 
failure*
 - *Added test that validates that TaskInfo is transferred only once across 
recoveries*
 - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): (yes / no)
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
 - The serializers: (yes / no / don't know)
 - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
 - The S3 file system connector: (yes / no / don't know)
   
   ## Documentation
   
 - Does this pull request introduce a new feature? (yes / no)
 - If yes, how is the feature documented? (not applicable / docs / JavaDocs 
/ not documented)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:

[jira] [Commented] (FLINK-7243) Add ParquetInputFormat

2018-08-02 Thread Zhenqiu Huang (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567546#comment-16567546
 ] 

Zhenqiu Huang commented on FLINK-7243:
--

[~najman] [~nssalian]

I created a diff for the task. After polishing tonight, I will create a pull 
request. 

 
https://github.com/apache/flink/compare/master...HuangZhenQiu:add-parquet-input-format

> Add ParquetInputFormat
> --
>
> Key: FLINK-7243
> URL: https://issues.apache.org/jira/browse/FLINK-7243
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: godfrey he
>Assignee: Zhenqiu Huang
>Priority: Major
>
> Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-7642) Upgrade maven surefire plugin to 2.21.0

2018-08-02 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433258#comment-16433258
 ] 

Ted Yu edited comment on FLINK-7642 at 8/2/18 8:28 PM:
---

SUREFIRE-1439 is in 2.21.0 which is needed for compiling with Java 10 .


was (Author: yuzhih...@gmail.com):
SUREFIRE-1439 is in 2.21.0 which is needed for compiling with Java 10

> Upgrade maven surefire plugin to 2.21.0
> ---
>
> Key: FLINK-7642
> URL: https://issues.apache.org/jira/browse/FLINK-7642
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>
> Surefire 2.19 release introduced more useful test filters which would let us 
> run a subset of the test.
> This issue is for upgrading maven surefire plugin to 2.21.0 which contains 
> SUREFIRE-1422



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9924) Upgrade zookeeper to 3.4.13

2018-08-02 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9924:
--
Description: 
zookeeper 3.4.13 is being released.


ZOOKEEPER-2959 fixes data loss when observer is used
ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / 
cloud)
environment

  was:
zookeeper 3.4.13 is being released.

ZOOKEEPER-2959 fixes data loss when observer is used
ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / 
cloud)
environment


> Upgrade zookeeper to 3.4.13
> ---
>
> Key: FLINK-9924
> URL: https://issues.apache.org/jira/browse/FLINK-9924
> Project: Flink
>  Issue Type: Task
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>
> zookeeper 3.4.13 is being released.
> ZOOKEEPER-2959 fixes data loss when observer is used
> ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container 
> / cloud)
> environment



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10035) ConcurrentModificationException with flink-metrics-slf4j

2018-08-02 Thread Chesnay Schepler (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567424#comment-16567424
 ] 

Chesnay Schepler commented on FLINK-10035:
--

This isn't really an issue, and applies to several reporters. This only happens 
if metrics are (un-)registered while a report is going on

For long living this isn't an issue as all it does is delay a report, and for 
short living jobs scheduled reporters are unreliable anyway.

> ConcurrentModificationException with flink-metrics-slf4j
> 
>
> Key: FLINK-10035
> URL: https://issues.apache.org/jira/browse/FLINK-10035
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.2
>Reporter: Nico Kruber
>Priority: Major
>
> {code}
> 2018-08-02 15:45:08,052 WARN  
> org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
> reporting metrics
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
>   at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
>   at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
>   at 
> org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:95)
>   at 
> org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>   at java.lang.Thread.run(Thread.java:748)
> {code}
> https://api.travis-ci.org/v3/job/411307171/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10035) ConcurrentModificationException with flink-metrics-slf4j

2018-08-02 Thread Nico Kruber (JIRA)
Nico Kruber created FLINK-10035:
---

 Summary: ConcurrentModificationException with flink-metrics-slf4j
 Key: FLINK-10035
 URL: https://issues.apache.org/jira/browse/FLINK-10035
 Project: Flink
  Issue Type: Bug
  Components: Metrics
Affects Versions: 1.5.2
Reporter: Nico Kruber


{code}
2018-08-02 15:45:08,052 WARN  
org.apache.flink.runtime.metrics.MetricRegistryImpl   - Error while 
reporting metrics
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437)
at java.util.HashMap$EntryIterator.next(HashMap.java:1471)
at java.util.HashMap$EntryIterator.next(HashMap.java:1469)
at 
org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:95)
at 
org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at 
java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
{code}
https://api.travis-ci.org/v3/job/411307171/log.txt



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9950) Handle migration to Gitbox

2018-08-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-9950.
---
Resolution: Fixed

> Handle migration to Gitbox
> --
>
> Key: FLINK-9950
> URL: https://issues.apache.org/jira/browse/FLINK-9950
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Parent issue for all things we have to change now that Gitbox is enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10034) Update website pages

2018-08-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-10034.

Resolution: Fixed

cc2041d3bf80c2fb91cedb6ec0b29ea10a9b4e77

> Update website pages
> 
>
> Key: FLINK-10034
> URL: https://issues.apache.org/jira/browse/FLINK-10034
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Several pages of the project website still refer to the old repositories.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10034) Update website pages

2018-08-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10034:
-
Description: Several pages of the project website still refer to the old 
repositories.  (was: The community page still refers to the old repositories.)

> Update website pages
> 
>
> Key: FLINK-10034
> URL: https://issues.apache.org/jira/browse/FLINK-10034
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Several pages of the project website still refer to the old repositories.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10034) Update website pages

2018-08-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-10034:
-
Summary: Update website pages  (was: Update community page)

> Update website pages
> 
>
> Key: FLINK-10034
> URL: https://issues.apache.org/jira/browse/FLINK-10034
> Project: Flink
>  Issue Type: Sub-task
>  Components: Project Website
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> The community page still refers to the old repositories.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-10034) Update community page

2018-08-02 Thread Chesnay Schepler (JIRA)
Chesnay Schepler created FLINK-10034:


 Summary: Update community page
 Key: FLINK-10034
 URL: https://issues.apache.org/jira/browse/FLINK-10034
 Project: Flink
  Issue Type: Sub-task
  Components: Project Website
Reporter: Chesnay Schepler
Assignee: Chesnay Schepler


The community page still refers to the old repositories.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Reopened] (FLINK-9950) Handle migration to Gitbox

2018-08-02 Thread Chesnay Schepler (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler reopened FLINK-9950:
-

> Handle migration to Gitbox
> --
>
> Key: FLINK-9950
> URL: https://issues.apache.org/jira/browse/FLINK-9950
> Project: Flink
>  Issue Type: Improvement
>  Components: Build System, Documentation
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
>
> Parent issue for all things we have to change now that Gitbox is enabled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10033) Let Task release reference to Invokable on shutdown

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567385#comment-16567385
 ] 

ASF GitHub Bot commented on FLINK-10033:


StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task 
releases reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480
 
 
   ## What is the purpose of the change
   
   References to Task objects may under some conditions linger longer than for 
the lifetime of the task. For example, in case of local network channels, the 
receiving task may have a reference to the object of the task that produced the 
data.
   
   To guard against memory leaks, the Task releases the reference to its 
AbstractInvokable
   when it shuts down or cancels.
   
   ## Brief change log
   
 - The `Task` nulls out its heap reference to the `AbstractInvokable`
 - `TaskTest` verifies that for finite task execution and cancellation
   
   ## Verifying this change
   
   Self-contained in a unit test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Let Task release reference to Invokable on shutdown
> ---
>
> Key: FLINK-10033
> URL: https://issues.apache.org/jira/browse/FLINK-10033
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> References to Task objects may under some conditions linger longer than for 
> the lifetime of the task. For example, in case of local network channels, the 
> receiving task may have a reference to the object of the task that produced 
> the data.
> To prevent against memory leaks, the Task needs to release all references to 
> its AbstractInvokable when it shuts down or cancels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-10033) Let Task release reference to Invokable on shutdown

2018-08-02 Thread ASF GitHub Bot (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-10033:
---
Labels: pull-request-available  (was: )

> Let Task release reference to Invokable on shutdown
> ---
>
> Key: FLINK-10033
> URL: https://issues.apache.org/jira/browse/FLINK-10033
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.5.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> References to Task objects may under some conditions linger longer than for 
> the lifetime of the task. For example, in case of local network channels, the 
> receiving task may have a reference to the object of the task that produced 
> the data.
> To prevent against memory leaks, the Task needs to release all references to 
> its AbstractInvokable when it shuts down or cancels.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable

2018-08-02 Thread GitBox
StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task 
releases reference to AbstractInvokable
URL: https://github.com/apache/flink/pull/6480
 
 
   ## What is the purpose of the change
   
   References to Task objects may under some conditions linger longer than for 
the lifetime of the task. For example, in case of local network channels, the 
receiving task may have a reference to the object of the task that produced the 
data.
   
   To guard against memory leaks, the Task releases the reference to its 
AbstractInvokable
   when it shuts down or cancels.
   
   ## Brief change log
   
 - The `Task` nulls out its heap reference to the `AbstractInvokable`
 - `TaskTest` verifies that for finite task execution and cancellation
   
   ## Verifying this change
   
   Self-contained in a unit test.
   
   ## Does this pull request potentially affect one of the following parts:
   
 - Dependencies (does it add or upgrade a dependency): **no**
 - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
 - The serializers: **no**
 - The runtime per-record code paths (performance sensitive): **no**
 - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
 - The S3 file system connector: **no**
   
   ## Documentation
   
 - Does this pull request introduce a new feature? **no**
 - If yes, how is the feature documented? **not applicable**
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Created] (FLINK-10033) Let Task release reference to Invokable on shutdown

2018-08-02 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-10033:


 Summary: Let Task release reference to Invokable on shutdown
 Key: FLINK-10033
 URL: https://issues.apache.org/jira/browse/FLINK-10033
 Project: Flink
  Issue Type: Bug
  Components: TaskManager
Affects Versions: 1.5.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.5.3, 1.6.0


References to Task objects may under some conditions linger longer than for the 
lifetime of the task. For example, in case of local network channels, the 
receiving task may have a reference to the object of the task that produced the 
data.

To prevent against memory leaks, the Task needs to release all references to 
its AbstractInvokable when it shuts down or cancels.




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567291#comment-16567291
 ] 

ASF GitHub Bot commented on FLINK-9969:
---

zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose 
InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207319425
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory implements 
InMemorySorterFactory {
+
+   @Nonnull
+   private final TypeSerializerFactory typeSerializerFactory;
+
+   @Nonnull
+   private final TypeComparator typeComparator;
+
+   private final boolean useFixedLengthRecordSorter;
+
+   @Nullable
+   private TypeSerializer initialSerializer;
+
+   DefaultInMemorySorterFactory(
+   @Nonnull TypeSerializerFactory typeSerializerFactory,
+   @Nonnull TypeComparator typeComparator,
+   int thresholdForInPlaceSorting) {
+   this.typeSerializerFactory = typeSerializerFactory;
+   this.typeComparator = typeComparator;
+
+   this.initialSerializer = typeSerializerFactory.getSerializer();
+
+   this.useFixedLengthRecordSorter = 
typeComparator.supportsSerializationWithKeyNormalization() &&
+   initialSerializer.getLength() > 0 && 
initialSerializer.getLength() <= thresholdForInPlaceSorting;
+   }
+
+   @Override
+   public InMemorySorter create(List sortSegments) {
+
+   final TypeSerializer typeSerializer;
+
+   if (initialSerializer == null) {
+   typeSerializer = typeSerializerFactory.getSerializer();
+   } else {
+   typeSerializer = initialSerializer;
 
 Review comment:
   couldn't we simplify this by always discarding the serializer created in the 
constructor? We could then remove the `initialSerializer` field and always 
create a new serializer here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unreasonable memory requirements to complete examples/batch/WordCount
> -
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> 

[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567292#comment-16567292
 ] 

ASF GitHub Bot commented on FLINK-9969:
---

zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose 
InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207319672
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 ##
 @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List
TypeSerializerFactory serializerFactory, 
TypeComparator comparator,
int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
-   boolean objectReuseEnabled)
-   throws IOException
-   {
+   boolean objectReuseEnabled) throws IOException {
+   this (
+   memoryManager,
+   memory,
+   ioManager,
+   input,
+   parentTask,
+   serializerFactory,
+   comparator,
+   numSortBuffers,
+   maxNumFileHandles,
+   startSpillingFraction,
+   noSpillingMemory,
+   handleLargeRecords,
+   objectReuseEnabled,
+   new DefaultInMemorySorterFactory<>(serializerFactory, 
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+   }
+
+   protected UnilateralSortMerger(
 
 Review comment:
   `@VisibleForTesting`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Unreasonable memory requirements to complete examples/batch/WordCount
> -
>
> Key: FLINK-9969
> URL: https://issues.apache.org/jira/browse/FLINK-9969
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager
>Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0
>Reporter: Piotr Nowojski
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
> Attachments: yarn_logs
>
>
> setup on AWS EMR:
>  * 5 worker nodes (m4.4xlarge nodes) 
>  * 1 master node (m4.large)
> following command fails with out of memory errors:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 
> examples/batch/WordCount.jar{noformat}
> Only increasing memory over 17.2GB example completes. At the same time after 
> disabling flip6 following command succeeds:
> {noformat}
> export HADOOP_CLASSPATH=`hadoop classpath`
> ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 
> examples/batch/WordCount.jar{noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

2018-08-02 Thread GitBox
zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose 
InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207319672
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java
 ##
 @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager 
memoryManager, List
TypeSerializerFactory serializerFactory, 
TypeComparator comparator,
int numSortBuffers, int maxNumFileHandles,
float startSpillingFraction, boolean noSpillingMemory, 
boolean handleLargeRecords,
-   boolean objectReuseEnabled)
-   throws IOException
-   {
+   boolean objectReuseEnabled) throws IOException {
+   this (
+   memoryManager,
+   memory,
+   ioManager,
+   input,
+   parentTask,
+   serializerFactory,
+   comparator,
+   numSortBuffers,
+   maxNumFileHandles,
+   startSpillingFraction,
+   noSpillingMemory,
+   handleLargeRecords,
+   objectReuseEnabled,
+   new DefaultInMemorySorterFactory<>(serializerFactory, 
comparator, THRESHOLD_FOR_IN_PLACE_SORTING));
+   }
+
+   protected UnilateralSortMerger(
 
 Review comment:
   `@VisibleForTesting`?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger

2018-08-02 Thread GitBox
zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose 
InMemorySorters created by the UnilateralSortMerger
URL: https://github.com/apache/flink/pull/6479#discussion_r207319425
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java
 ##
 @@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.operators.sort;
+
+import org.apache.flink.api.common.typeutils.TypeComparator;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
+import org.apache.flink.core.memory.MemorySegment;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/**
+ * Default factory for {@link InMemorySorter}.
+ */
+public class DefaultInMemorySorterFactory implements 
InMemorySorterFactory {
+
+   @Nonnull
+   private final TypeSerializerFactory typeSerializerFactory;
+
+   @Nonnull
+   private final TypeComparator typeComparator;
+
+   private final boolean useFixedLengthRecordSorter;
+
+   @Nullable
+   private TypeSerializer initialSerializer;
+
+   DefaultInMemorySorterFactory(
+   @Nonnull TypeSerializerFactory typeSerializerFactory,
+   @Nonnull TypeComparator typeComparator,
+   int thresholdForInPlaceSorting) {
+   this.typeSerializerFactory = typeSerializerFactory;
+   this.typeComparator = typeComparator;
+
+   this.initialSerializer = typeSerializerFactory.getSerializer();
+
+   this.useFixedLengthRecordSorter = 
typeComparator.supportsSerializationWithKeyNormalization() &&
+   initialSerializer.getLength() > 0 && 
initialSerializer.getLength() <= thresholdForInPlaceSorting;
+   }
+
+   @Override
+   public InMemorySorter create(List sortSegments) {
+
+   final TypeSerializer typeSerializer;
+
+   if (initialSerializer == null) {
+   typeSerializer = typeSerializerFactory.getSerializer();
+   } else {
+   typeSerializer = initialSerializer;
 
 Review comment:
   couldn't we simplify this by always discarding the serializer created in the 
constructor? We could then remove the `initialSerializer` field and always 
create a new serializer here.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Resolved] (FLINK-9947) Document unified table sources/sinks/formats

2018-08-02 Thread Timo Walther (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-9947.
-
   Resolution: Fixed
Fix Version/s: 1.6.0

Fixed in 1.7.0: dacc16b4fa6db6abfdbf73b99f26a5fd36b12acd
Fixed in 1.6.0: c858d31e7e6b404f810741ac076e66e14cb06868

> Document unified table sources/sinks/formats
> 
>
> Key: FLINK-9947
> URL: https://issues.apache.org/jira/browse/FLINK-9947
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Table API  SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> The recent unification of table sources/sinks/formats needs documentation. I 
> propose a new page that explains the built-in sources, sinks, and formats as 
> well as a page for customization of public interfaces.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9947) Document unified table sources/sinks/formats

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567119#comment-16567119
 ] 

ASF GitHub Bot commented on FLINK-9947:
---

asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table 
sources/sinks/formats
URL: https://github.com/apache/flink/pull/6456
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
new file mode 100644
index 000..fa51783a590
--- /dev/null
+++ b/docs/dev/table/connect.md
@@ -0,0 +1,1049 @@
+---
+title: "Connect to External Systems"
+nav-parent_id: tableapi
+nav-pos: 19
+---
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks 
and register them in Flink. After a source or sink has been registered, it can 
be accessed by Table API & SQL statements.
+
+Attention If you want to implement 
your own *custom* table source or sink, have a look at the [user-defined 
sources & sinks page](sourceSinks.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Dependencies
+
+
+The following table list all available connectors and formats. Their mutual 
compatibility is tagged in the corresponding sections for [table 
connectors](connect.html#table-connectors) and [table 
formats](connect.html#table-formats). The following table provides dependency 
information for both projects using a build automation tool (such as Maven or 
SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+### Connectors
+
+| Name  | Version   | Maven dependency | SQL 
Client JAR |
+| : | : | :--- | 
:--|
+| Filesystem|   | Built-in | Built-in  
 |
+| Apache Kafka  | 0.8   | `flink-connector-kafka-0.8`  | Not 
available  |
+| Apache Kafka  | 0.9   | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka  | 0.10  | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka  | 0.11  | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+
+### Formats
+
+| Name  | Maven dependency | SQL Client JAR |
+| : | :--- | :- |
+| CSV   | Built-in | Built-in   |
+| JSON  | `flink-json` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
+| Apache Avro   | `flink-avro` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
+
+{% else %}
+
+This table is only available for stable releases.
+
+{% endif %}
+
+{% top %}
+
+Overview
+
+
+Beginning from Flink 1.6, the declaration of a connection to an external 
system is separated from the actual implementation.
+
+Connections can be specified either
+
+- **programmatically** using a `Descriptor` under 
`org.apache.flink.table.descriptors` for Table & SQL API
+- or **declaratively** via [YAML configuration files](http://yaml.org/) for 
the SQL Client.
+
+This allows not only for better unification of APIs and SQL Client but also 
for better extensibility in case of [custom implementations](sourceSinks.html) 
without changing the actual declaration.
+
+Every declaration is similar to a SQL `CREATE TABLE` statement. One can define 
the name of the table, the schema of the table, a connector, and a 

[GitHub] asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table sources/sinks/formats

2018-08-02 Thread GitBox
asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table 
sources/sinks/formats
URL: https://github.com/apache/flink/pull/6456
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md
new file mode 100644
index 000..fa51783a590
--- /dev/null
+++ b/docs/dev/table/connect.md
@@ -0,0 +1,1049 @@
+---
+title: "Connect to External Systems"
+nav-parent_id: tableapi
+nav-pos: 19
+---
+
+
+Flink's Table API & SQL programs can be connected to other external systems 
for reading and writing both batch and streaming tables. A table source 
provides access to data which is stored in external systems (such as a 
database, key-value store, message queue, or file system). A table sink emits a 
table to an external storage system. Depending on the type of source and sink, 
they support different formats such as CSV, Parquet, or ORC.
+
+This page describes how to declare built-in table sources and/or table sinks 
and register them in Flink. After a source or sink has been registered, it can 
be accessed by Table API & SQL statements.
+
+Attention If you want to implement 
your own *custom* table source or sink, have a look at the [user-defined 
sources & sinks page](sourceSinks.html).
+
+* This will be replaced by the TOC
+{:toc}
+
+Dependencies
+
+
+The following table list all available connectors and formats. Their mutual 
compatibility is tagged in the corresponding sections for [table 
connectors](connect.html#table-connectors) and [table 
formats](connect.html#table-formats). The following table provides dependency 
information for both projects using a build automation tool (such as Maven or 
SBT) and SQL Client with SQL JAR bundles.
+
+{% if site.is_stable %}
+
+### Connectors
+
+| Name  | Version   | Maven dependency | SQL 
Client JAR |
+| : | : | :--- | 
:--|
+| Filesystem|   | Built-in | Built-in  
 |
+| Apache Kafka  | 0.8   | `flink-connector-kafka-0.8`  | Not 
available  |
+| Apache Kafka  | 0.9   | `flink-connector-kafka-0.9`  | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka  | 0.10  | `flink-connector-kafka-0.10` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+| Apache Kafka  | 0.11  | `flink-connector-kafka-0.11` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar)
 |
+
+### Formats
+
+| Name  | Maven dependency | SQL Client JAR |
+| : | :--- | :- |
+| CSV   | Built-in | Built-in   |
+| JSON  | `flink-json` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar)
 |
+| Apache Avro   | `flink-avro` | 
[Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar)
 |
+
+{% else %}
+
+This table is only available for stable releases.
+
+{% endif %}
+
+{% top %}
+
+Overview
+
+
+Beginning from Flink 1.6, the declaration of a connection to an external 
system is separated from the actual implementation.
+
+Connections can be specified either
+
+- **programmatically** using a `Descriptor` under 
`org.apache.flink.table.descriptors` for Table & SQL API
+- or **declaratively** via [YAML configuration files](http://yaml.org/) for 
the SQL Client.
+
+This allows not only for better unification of APIs and SQL Client but also 
for better extensibility in case of [custom implementations](sourceSinks.html) 
without changing the actual declaration.
+
+Every declaration is similar to a SQL `CREATE TABLE` statement. One can define 
the name of the table, the schema of the table, a connector, and a data format 
upfront for connecting to an external system.
+
+The **connector** describes the external system that stores the data of a 
table. Storage systems such as [Apacha Kafka](http://kafka.apache.org/) or a 
regular file system can be 

[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567113#comment-16567113
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207306976
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception {
CompletableFuture stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
 
 Review comment:
   I think we need to check whether the supporting actors are not null because 
we initialize them only after the `clearStateFuture` has been completed. If 
this takes a bit and someone revokes our leadership in the meantime, 
`clearState` will be called before the support actors have been created.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207306976
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception {
CompletableFuture stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
 
 Review comment:
   I think we need to check whether the supporting actors are not null because 
we initialize them only after the `clearStateFuture` has been completed. If 
this takes a bit and someone revokes our leadership in the meantime, 
`clearState` will be called before the support actors have been created.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Updated] (FLINK-9363) Bump up the Jackson version

2018-08-02 Thread Ted Yu (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ted Yu updated FLINK-9363:
--
Description: 
CVE's for Jackson:


CVE-2017-17485
CVE-2018-5968
CVE-2018-7489

We can upgrade to 2.9.5

  was:
CVE's for Jackson:

CVE-2017-17485
CVE-2018-5968
CVE-2018-7489

We can upgrade to 2.9.5


> Bump up the Jackson version
> ---
>
> Key: FLINK-9363
> URL: https://issues.apache.org/jira/browse/FLINK-9363
> Project: Flink
>  Issue Type: Improvement
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Major
>  Labels: security
>
> CVE's for Jackson:
> CVE-2017-17485
> CVE-2018-5968
> CVE-2018-7489
> We can upgrade to 2.9.5



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567088#comment-16567088
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207302752
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception {

resourceManager.taskRouter.expectMsgClass(Disconnected.class);
}};
}
+
+   @Test
+   public void testClearStateRevokeLeadership() throws Exception {
+   new Context() {{
+   MesosWorkerStore.Worker worker1 = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList());
+
+   startResourceManager();
+   rmServices.rmLeaderElectionService.notLeader();
+   rmServices.grantLeadership();
+
+   //resourceManager.stateCleared.await(5, 
TimeUnit.SECONDS);
+   assertThat(resourceManager.workersInLaunch.size(), 
equalTo(0));
+   verify(rmServices.schedulerDriver).stop(true);
 
 Review comment:
   Is the test already complete?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9861) Add end-to-end test for reworked BucketingSink

2018-08-02 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-9861:
-

Assignee: Kostas Kloudas

> Add end-to-end test for reworked BucketingSink
> --
>
> Key: FLINK-9861
> URL: https://issues.apache.org/jira/browse/FLINK-9861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should add a end-to-end test for the reworked BucketingSink to verify that 
> the sink works with different {{FileSystems}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567084#comment-16567084
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207294014
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception {
CompletableFuture stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
 
-   CompletableFuture stopFuture = CompletableFuture.allOf(
+   return CompletableFuture.allOf(
stopTaskMonitorFuture,
stopConnectionMonitorFuture,
stopLaunchCoordinatorFuture,
stopReconciliationCoordinatorFuture);
+   }
+
+   @Override
+   public CompletableFuture postStop() {
+   final CompletableFuture supportActorsStopFuture = 
stopSupportingActorsAsync();
 
final CompletableFuture terminationFuture = 
super.postStop();
 
 Review comment:
   I think we should call `super.postStop` only after `supportActorsStopFuture` 
has been completed. Otherwise we might risk that the parent class shuts some 
resources down which are used by the support actors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9861) Add end-to-end test for reworked BucketingSink

2018-08-02 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-9861:
-

Assignee: Chesnay Schepler  (was: Kostas Kloudas)

> Add end-to-end test for reworked BucketingSink
> --
>
> Key: FLINK-9861
> URL: https://issues.apache.org/jira/browse/FLINK-9861
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Affects Versions: 1.6.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>
> We should add a end-to-end test for the reworked BucketingSink to verify that 
> the sink works with different {{FileSystems}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567082#comment-16567082
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207302992
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -894,17 +900,21 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
 
// clear the state if we've been the leader 
before
if (getFencingToken() != null) {
-   clearState();
+   clearStateInternal();
}
 
setFencingToken(newResourceManagerId);
 
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
 
-   getRpcService().execute(
-   () ->
+   prepareLeadershipAsync()
+   .exceptionally(t -> {
+   onFatalError(t);
+   return null;
+   })
+   .thenRunAsync(() ->
// confirming the leader 
session ID might be blocking,
-   
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
+   
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), 
getRpcService().getExecutor());
 
 Review comment:
   Maybe we could refactor the `grantLeadership` the following way:
   ```
   /**
 * Callback method when current resourceManager is granted leadership.
 *
 * @param newLeaderSessionID unique leadershipID
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
final CompletableFuture acceptLeadershipFuture = 
CompletableFuture.supplyAsync(
() -> tryAcceptLeadership(newLeaderSessionID),

getUnfencedMainThreadExecutor()).thenCompose(Function.identity());
   
final CompletableFuture confirmationFuture = 
acceptLeadershipFuture.thenAcceptAsync(
(Boolean acceptLeadership) -> {
if (acceptLeadership) {
// confirming the leader session ID 
might be blocking,

leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
}
},
getRpcService().getExecutor());
   
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {

onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}
   
private CompletableFuture tryAcceptLeadership(UUID 
newLeaderSessionID) {
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
final ResourceManagerId newResourceManagerId = 
ResourceManagerId.fromUuid(newLeaderSessionID);
   
log.info("ResourceManager {} was granted leadership 
with fencing token {}", getAddress(), newResourceManagerId);
   
// clear the state if we've been the leader before
if (getFencingToken() != null) {
clearStateInternal();
}
   
setFencingToken(newResourceManagerId);
   
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
   
return prepareLeadershipAsync().thenApply(ignored -> 
true);
} else {
return CompletableFuture.completedFuture(false);
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: 

[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567085#comment-16567085
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207294205
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception {

resourceManager.taskRouter.expectMsgClass(Disconnected.class);
}};
}
+
+   @Test
+   public void testClearStateRevokeLeadership() throws Exception {
+   new Context() {{
+   MesosWorkerStore.Worker worker1 = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList());
+
+   startResourceManager();
+   rmServices.rmLeaderElectionService.notLeader();
+   rmServices.grantLeadership();
+
+   //resourceManager.stateCleared.await(5, 
TimeUnit.SECONDS);
 
 Review comment:
   Can this line be removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567081#comment-16567081
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207293251
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -894,17 +900,21 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
 
// clear the state if we've been the leader 
before
if (getFencingToken() != null) {
-   clearState();
+   clearStateInternal();
}
 
setFencingToken(newResourceManagerId);
 
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
 
-   getRpcService().execute(
-   () ->
+   prepareLeadershipAsync()
+   .exceptionally(t -> {
+   onFatalError(t);
+   return null;
+   })
 
 Review comment:
   Let's move the exception handling at the very end. That way we can also 
catch if `confirmLeaderSessionID` fails. In all cases, we should call 
`onFatalError`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567083#comment-16567083
 ] 

ASF GitHub Bot commented on FLINK-9936:
---

tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207295725
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -278,22 +267,76 @@ protected void initialize() throws 
ResourceManagerException {
catch (IOException e) {
throw new ResourceManagerException("Unable to configure 
the artifact server with TaskManager artifacts.", e);
}
+   }
 
-   // begin scheduling
-   connectionMonitor.tell(new ConnectionMonitor.Start(), 
selfActor);
-   schedulerDriver.start();
+   @Override
+   protected CompletableFuture prepareLeadershipAsync() {
+   Preconditions.checkState(initializedMesosConfig != null);
+
+   return clearStateFuture
+   .thenRunAsync(() -> {
+   schedulerDriver = 
initializedMesosConfig.createDriver(
+   new 
MesosResourceManagerSchedulerCallback(),
+   false);
+
+   // create supporting actors
+   connectionMonitor = createConnectionMonitor();
+   launchCoordinator = 
createLaunchCoordinator(schedulerDriver, selfActor);
+   reconciliationCoordinator = 
createReconciliationCoordinator(schedulerDriver);
+   taskMonitor = 
createTaskMonitor(schedulerDriver);
+   }, getMainThreadExecutor())
+   .thenCombineAsync(getWorkersAsync(), (ignored, 
tasksFromPreviousAttempts) -> {
+   // recover state
+   recoverWorkers(tasksFromPreviousAttempts);
+
+   // begin scheduling
+   connectionMonitor.tell(new 
ConnectionMonitor.Start(), selfActor);
+   schedulerDriver.start();
+
+   LOG.info("Mesos resource manager started.");
+   return null;
+   }, getMainThreadExecutor());
+   }
 
-   LOG.info("Mesos resource manager initialized.");
+   @Override
+   protected void clearState() {
+   schedulerDriver.stop(true);
+
+   clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() 
-> {
+   workersInNew.clear();
+   workersInLaunch.clear();
+   workersBeingReturned.clear();
 
 Review comment:
   Can't we clear these fields right away when `clearState` is called?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Mesos resource manager unable to connect to master after failover
> -
>
> Key: FLINK-9936
> URL: https://issues.apache.org/jira/browse/FLINK-9936
> Project: Flink
>  Issue Type: Bug
>  Components: Mesos, Scheduler
>Affects Versions: 1.5.0, 1.5.1, 1.6.0
>Reporter: Renjie Liu
>Assignee: Gary Yao
>Priority: Blocker
>  Labels: pull-request-available
> Fix For: 1.5.3, 1.6.0
>
>
> When deployed in mesos session cluster mode, the connector monitor keeps 
> reporting unable to connect to mesos after restart. In fact, scheduler driver 
> already connected to mesos master, but when the connected message is lost. 
> This is because leadership is not granted yet and fence id is not set, the 
> rpc service ignores the connected message. So we should connect to mesos 
> master after leadership is granted.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207295725
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -278,22 +267,76 @@ protected void initialize() throws 
ResourceManagerException {
catch (IOException e) {
throw new ResourceManagerException("Unable to configure 
the artifact server with TaskManager artifacts.", e);
}
+   }
 
-   // begin scheduling
-   connectionMonitor.tell(new ConnectionMonitor.Start(), 
selfActor);
-   schedulerDriver.start();
+   @Override
+   protected CompletableFuture prepareLeadershipAsync() {
+   Preconditions.checkState(initializedMesosConfig != null);
+
+   return clearStateFuture
+   .thenRunAsync(() -> {
+   schedulerDriver = 
initializedMesosConfig.createDriver(
+   new 
MesosResourceManagerSchedulerCallback(),
+   false);
+
+   // create supporting actors
+   connectionMonitor = createConnectionMonitor();
+   launchCoordinator = 
createLaunchCoordinator(schedulerDriver, selfActor);
+   reconciliationCoordinator = 
createReconciliationCoordinator(schedulerDriver);
+   taskMonitor = 
createTaskMonitor(schedulerDriver);
+   }, getMainThreadExecutor())
+   .thenCombineAsync(getWorkersAsync(), (ignored, 
tasksFromPreviousAttempts) -> {
+   // recover state
+   recoverWorkers(tasksFromPreviousAttempts);
+
+   // begin scheduling
+   connectionMonitor.tell(new 
ConnectionMonitor.Start(), selfActor);
+   schedulerDriver.start();
+
+   LOG.info("Mesos resource manager started.");
+   return null;
+   }, getMainThreadExecutor());
+   }
 
-   LOG.info("Mesos resource manager initialized.");
+   @Override
+   protected void clearState() {
+   schedulerDriver.stop(true);
+
+   clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() 
-> {
+   workersInNew.clear();
+   workersInLaunch.clear();
+   workersBeingReturned.clear();
 
 Review comment:
   Can't we clear these fields right away when `clearState` is called?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207302752
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception {

resourceManager.taskRouter.expectMsgClass(Disconnected.class);
}};
}
+
+   @Test
+   public void testClearStateRevokeLeadership() throws Exception {
+   new Context() {{
+   MesosWorkerStore.Worker worker1 = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList());
+
+   startResourceManager();
+   rmServices.rmLeaderElectionService.notLeader();
+   rmServices.grantLeadership();
+
+   //resourceManager.stateCleared.await(5, 
TimeUnit.SECONDS);
+   assertThat(resourceManager.workersInLaunch.size(), 
equalTo(0));
+   verify(rmServices.schedulerDriver).stop(true);
 
 Review comment:
   Is the test already complete?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207302992
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -894,17 +900,21 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
 
// clear the state if we've been the leader 
before
if (getFencingToken() != null) {
-   clearState();
+   clearStateInternal();
}
 
setFencingToken(newResourceManagerId);
 
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
 
-   getRpcService().execute(
-   () ->
+   prepareLeadershipAsync()
+   .exceptionally(t -> {
+   onFatalError(t);
+   return null;
+   })
+   .thenRunAsync(() ->
// confirming the leader 
session ID might be blocking,
-   
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID));
+   
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), 
getRpcService().getExecutor());
 
 Review comment:
   Maybe we could refactor the `grantLeadership` the following way:
   ```
   /**
 * Callback method when current resourceManager is granted leadership.
 *
 * @param newLeaderSessionID unique leadershipID
 */
@Override
public void grantLeadership(final UUID newLeaderSessionID) {
final CompletableFuture acceptLeadershipFuture = 
CompletableFuture.supplyAsync(
() -> tryAcceptLeadership(newLeaderSessionID),

getUnfencedMainThreadExecutor()).thenCompose(Function.identity());
   
final CompletableFuture confirmationFuture = 
acceptLeadershipFuture.thenAcceptAsync(
(Boolean acceptLeadership) -> {
if (acceptLeadership) {
// confirming the leader session ID 
might be blocking,

leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
}
},
getRpcService().getExecutor());
   
confirmationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {

onFatalError(ExceptionUtils.stripCompletionException(throwable));
}
});
}
   
private CompletableFuture tryAcceptLeadership(UUID 
newLeaderSessionID) {
if (leaderElectionService.hasLeadership(newLeaderSessionID)) {
final ResourceManagerId newResourceManagerId = 
ResourceManagerId.fromUuid(newLeaderSessionID);
   
log.info("ResourceManager {} was granted leadership 
with fencing token {}", getAddress(), newResourceManagerId);
   
// clear the state if we've been the leader before
if (getFencingToken() != null) {
clearStateInternal();
}
   
setFencingToken(newResourceManagerId);
   
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
   
return prepareLeadershipAsync().thenApply(ignored -> 
true);
} else {
return CompletableFuture.completedFuture(false);
}
}
   ```


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207294014
 
 

 ##
 File path: 
flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java
 ##
 @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception {
CompletableFuture stopReconciliationCoordinatorFuture 
= stopActor(reconciliationCoordinator, stopTimeout);
reconciliationCoordinator = null;
 
-   CompletableFuture stopFuture = CompletableFuture.allOf(
+   return CompletableFuture.allOf(
stopTaskMonitorFuture,
stopConnectionMonitorFuture,
stopLaunchCoordinatorFuture,
stopReconciliationCoordinatorFuture);
+   }
+
+   @Override
+   public CompletableFuture postStop() {
+   final CompletableFuture supportActorsStopFuture = 
stopSupportingActorsAsync();
 
final CompletableFuture terminationFuture = 
super.postStop();
 
 Review comment:
   I think we should call `super.postStop` only after `supportActorsStopFuture` 
has been completed. Otherwise we might risk that the parent class shuts some 
resources down which are used by the support actors.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207294205
 
 

 ##
 File path: 
flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java
 ##
 @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception {

resourceManager.taskRouter.expectMsgClass(Disconnected.class);
}};
}
+
+   @Test
+   public void testClearStateRevokeLeadership() throws Exception {
+   new Context() {{
+   MesosWorkerStore.Worker worker1 = 
MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host);
+   
when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1));
+   
when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList());
+
+   startResourceManager();
+   rmServices.rmLeaderElectionService.notLeader();
+   rmServices.grantLeadership();
+
+   //resourceManager.stateCleared.await(5, 
TimeUnit.SECONDS);
 
 Review comment:
   Can this line be removed?


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP

2018-08-02 Thread GitBox
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] 
WIP 
URL: https://github.com/apache/flink/pull/6464#discussion_r207293251
 
 

 ##
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 ##
 @@ -894,17 +900,21 @@ public void grantLeadership(final UUID 
newLeaderSessionID) {
 
// clear the state if we've been the leader 
before
if (getFencingToken() != null) {
-   clearState();
+   clearStateInternal();
}
 
setFencingToken(newResourceManagerId);
 
slotManager.start(getFencingToken(), 
getMainThreadExecutor(), new ResourceActionsImpl());
 
-   getRpcService().execute(
-   () ->
+   prepareLeadershipAsync()
+   .exceptionally(t -> {
+   onFatalError(t);
+   return null;
+   })
 
 Review comment:
   Let's move the exception handling at the very end. That way we can also 
catch if `confirmLeaderSessionID` fails. In all cases, we should call 
`onFatalError`.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Closed] (FLINK-10029) Refactor the code for better separation of concerns.

2018-08-02 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-10029.
--
Resolution: Fixed

Merged on master with 1b0baa162bd87efd69040eb787de8d6624f14c85
and on release-1.6 with d43e0f8f4d873b3e8392209853b56dd7a2c0db67

> Refactor the code for better separation of concerns.
> 
>
> Key: FLINK-10029
> URL: https://issues.apache.org/jira/browse/FLINK-10029
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-10027) Add logging to the StreamingFileSink

2018-08-02 Thread Kostas Kloudas (JIRA)


 [ 
https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-10027.
--
Resolution: Fixed

Merged on master with 852502b7d51f91c6f9c3479424516d0b9ae255e5
and on release-1.6 with a72d2c6d8bb07761434dadb20569e9df850cffaa

> Add logging to the StreamingFileSink
> 
>
> Key: FLINK-10027
> URL: https://issues.apache.org/jira/browse/FLINK-10027
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-10027) Add logging to the StreamingFileSink

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567073#comment-16567073
 ] 

ASF GitHub Bot commented on FLINK-10027:


asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index a350096e38b..6187e6853dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,13 +37,14 @@
 /**
  * A bucket is the directory organization of the output of the {@link 
StreamingFileSink}.
  *
- * For each incoming  element in the {@code BucketingSink}, the 
user-specified
- * {@link Bucketer Bucketer} is
- * queried to see in which bucket this element should be written to.
+ * For each incoming element in the {@code StreamingFileSink}, the 
user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
  */
-@PublicEvolving
+@Internal
 public class Bucket {
 
+   private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
private static final String PART_PREFIX = "part";
 
private final BucketID bucketId;
@@ -53,57 +57,27 @@
 
private final RecoverableWriter fsWriter;
 
-   private final Map> 
pendingPerCheckpoint = new HashMap<>();
-
-   private long partCounter;
-
-   private PartFileWriter currentPart;
+   private final RollingPolicy rollingPolicy;
 
-   private List pending;
-
-   /**
-* Constructor to restore a bucket from checkpointed state.
-*/
-   public Bucket(
-   RecoverableWriter fsWriter,
-   int subtaskIndex,
-   long initialPartCounter,
-   PartFileWriter.PartFileFactory 
partFileFactory,
-   BucketState bucketState) throws IOException {
+   private final Map> 
pendingPartsPerCheckpoint = new HashMap<>();
 
-   this(fsWriter, subtaskIndex, bucketState.getBucketId(), 
bucketState.getBucketPath(), initialPartCounter, partFileFactory);
-
-   // the constructor must have already initialized the filesystem 
writer
-   Preconditions.checkState(fsWriter != null);
-
-   // we try to resume the previous in-progress file, if the 
filesystem
-   // supports such operation. If not, we just commit the file and 
start fresh.
+   private long partCounter;
 
-   final RecoverableWriter.ResumeRecoverable resumable = 
bucketState.getInProgress();
-   if (resumable != null) {
-   currentPart = partFileFactory.resumeFrom(
-   bucketId, fsWriter, resumable, 
bucketState.getCreationTime());
-   }
+   private PartFileWriter inProgressPart;
 
-   // we commit pending files for previous checkpoints to the last 
successful one
-   // (from which we are recovering from)
-   for (List commitables: 
bucketState.getPendingPerCheckpoint().values()) {
-   for (RecoverableWriter.CommitRecoverable commitable: 
commitables) {
-   
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
-   }
-   }
-   }
+   private List 
pendingPartsForCurrentCheckpoint;
 
/**
 * Constructor to create a new empty bucket.
 */
-   public Bucket(
-   RecoverableWriter fsWriter,
-   int subtaskIndex,
-   BucketID bucketId,
-   Path bucketPath,
-   long initialPartCounter,
-   PartFileWriter.PartFileFactory 
partFileFactory) {
+   private Bucket(
+   

[GitHub] asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink

2018-08-02 Thread GitBox
asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
index a350096e38b..6187e6853dd 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java
@@ -18,11 +18,14 @@
 
 package org.apache.flink.streaming.api.functions.sink.filesystem;
 
-import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.annotation.Internal;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -34,13 +37,14 @@
 /**
  * A bucket is the directory organization of the output of the {@link 
StreamingFileSink}.
  *
- * For each incoming  element in the {@code BucketingSink}, the 
user-specified
- * {@link Bucketer Bucketer} is
- * queried to see in which bucket this element should be written to.
+ * For each incoming element in the {@code StreamingFileSink}, the 
user-specified
+ * {@link BucketAssigner} is queried to see in which bucket this element 
should be written to.
  */
-@PublicEvolving
+@Internal
 public class Bucket {
 
+   private static final Logger LOG = LoggerFactory.getLogger(Bucket.class);
+
private static final String PART_PREFIX = "part";
 
private final BucketID bucketId;
@@ -53,57 +57,27 @@
 
private final RecoverableWriter fsWriter;
 
-   private final Map> 
pendingPerCheckpoint = new HashMap<>();
-
-   private long partCounter;
-
-   private PartFileWriter currentPart;
+   private final RollingPolicy rollingPolicy;
 
-   private List pending;
-
-   /**
-* Constructor to restore a bucket from checkpointed state.
-*/
-   public Bucket(
-   RecoverableWriter fsWriter,
-   int subtaskIndex,
-   long initialPartCounter,
-   PartFileWriter.PartFileFactory 
partFileFactory,
-   BucketState bucketState) throws IOException {
+   private final Map> 
pendingPartsPerCheckpoint = new HashMap<>();
 
-   this(fsWriter, subtaskIndex, bucketState.getBucketId(), 
bucketState.getBucketPath(), initialPartCounter, partFileFactory);
-
-   // the constructor must have already initialized the filesystem 
writer
-   Preconditions.checkState(fsWriter != null);
-
-   // we try to resume the previous in-progress file, if the 
filesystem
-   // supports such operation. If not, we just commit the file and 
start fresh.
+   private long partCounter;
 
-   final RecoverableWriter.ResumeRecoverable resumable = 
bucketState.getInProgress();
-   if (resumable != null) {
-   currentPart = partFileFactory.resumeFrom(
-   bucketId, fsWriter, resumable, 
bucketState.getCreationTime());
-   }
+   private PartFileWriter inProgressPart;
 
-   // we commit pending files for previous checkpoints to the last 
successful one
-   // (from which we are recovering from)
-   for (List commitables: 
bucketState.getPendingPerCheckpoint().values()) {
-   for (RecoverableWriter.CommitRecoverable commitable: 
commitables) {
-   
fsWriter.recoverForCommit(commitable).commitAfterRecovery();
-   }
-   }
-   }
+   private List 
pendingPartsForCurrentCheckpoint;
 
/**
 * Constructor to create a new empty bucket.
 */
-   public Bucket(
-   RecoverableWriter fsWriter,
-   int subtaskIndex,
-   BucketID bucketId,
-   Path bucketPath,
-   long initialPartCounter,
-   PartFileWriter.PartFileFactory 
partFileFactory) {
+   private Bucket(
+   final RecoverableWriter fsWriter,
+   final int subtaskIndex,
+   final BucketID bucketId,
+   final Path bucketPath,
+   final long initialPartCounter,
+

[jira] [Commented] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567042#comment-16567042
 ] 

ASF GitHub Bot commented on FLINK-10010:


walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate 
unused BaseAlignedWindowAssigner related components
URL: https://github.com/apache/flink/pull/6471#issuecomment-409989144
 
 
   thx for the review @yanghua . for some reason java checkstyle did not catch 
these problems. just fixed it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Deprecate unused BaseAlignedWindowAssigner related components
> -
>
> Key: FLINK-10010
> URL: https://issues.apache.org/jira/browse/FLINK-10010
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Reporter: Rong Rong
>Assignee: Rong Rong
>Priority: Major
>  Labels: pull-request-available
>
> {{BaseAlignedWindowAssigner}} should be marked as deprecated and 
> {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components

2018-08-02 Thread GitBox
walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate 
unused BaseAlignedWindowAssigner related components
URL: https://github.com/apache/flink/pull/6471#issuecomment-409989144
 
 
   thx for the review @yanghua . for some reason java checkstyle did not catch 
these problems. just fixed it. 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services


[jira] [Commented] (FLINK-10027) Add logging to the StreamingFileSink

2018-08-02 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567040#comment-16567040
 ] 

ASF GitHub Bot commented on FLINK-10027:


kl0u commented on issue #6477: [FLINK-10027] Add logging to StreamingFileSink
URL: https://github.com/apache/flink/pull/6477#issuecomment-409988617
 
 
   Thanks for the review @zentol . 
   I will merge as soon as travis gives green.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add logging to the StreamingFileSink
> 
>
> Key: FLINK-10027
> URL: https://issues.apache.org/jira/browse/FLINK-10027
> Project: Flink
>  Issue Type: Sub-task
>  Components: filesystem-connector
>Affects Versions: 1.6.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
>Priority: Major
>  Labels: pull-request-available
> Fix For: 1.6.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   4   >