[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567824#comment-16567824 ] Zhenqiu Huang commented on FLINK-7243: -- [~najman] [~nssalian] Created a pull request [https://github.com/apache/flink/pull/6483.] Please review it when you have time. > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions
[ https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567823#comment-16567823 ] ASF GitHub Bot commented on FLINK-10020: tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. URL: https://github.com/apache/flink/pull/6482#discussion_r207447268 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -433,6 +440,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha } catch (ExpiredNextTokenException expiredToken) { LOG.warn("List Shards has an expired token. Reusing the previous state."); break; + } catch (SdkClientException ex) { + if (isRecoverableSdkClientException(ex)) { + long backoffMillis = fullJitterBackoff( + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.", + streamName, backoffMillis); + Thread.sleep(backoffMillis); Review comment: I'm wondering what kind of `SdkClientException`s there are. Do we really need to have a backoff here before retrying? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kinesis Consumer listShards should support more recoverable exceptions > -- > > Key: FLINK-10020 > URL: https://issues.apache.org/jira/browse/FLINK-10020 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > Currently transient errors in listShards make the consumer fail and cause the > entire job to reset. That is unnecessary for certain exceptions (like status > 503 errors). It should be possible to control the exceptions that qualify for > retry, similar to getRecords/isRecoverableSdkClientException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] HuangZhenQiu commented on issue #6483: [Flink-7243][flink-formats] Add parquet input format
HuangZhenQiu commented on issue #6483: [Flink-7243][flink-formats] Add parquet input format URL: https://github.com/apache/flink/pull/6483#issuecomment-410152524 @suez1224 Would you please have a look this PR? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
tzulitai commented on a change in pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. URL: https://github.com/apache/flink/pull/6482#discussion_r207447268 ## File path: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/proxy/KinesisProxy.java ## @@ -433,6 +440,16 @@ private ListShardsResult listShards(String streamName, @Nullable String startSha } catch (ExpiredNextTokenException expiredToken) { LOG.warn("List Shards has an expired token. Reusing the previous state."); break; + } catch (SdkClientException ex) { + if (isRecoverableSdkClientException(ex)) { + long backoffMillis = fullJitterBackoff( + listShardsBaseBackoffMillis, listShardsMaxBackoffMillis, listShardsExpConstant, attemptCount++); + LOG.warn("Got SdkClientException when listing shards from stream {}. Backing off for {} millis.", + streamName, backoffMillis); + Thread.sleep(backoffMillis); Review comment: I'm wondering what kind of `SdkClientException`s there are. Do we really need to have a backoff here before retrying? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9861) Add end-to-end test for reworked BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567814#comment-16567814 ] ASF GitHub Bot commented on FLINK-9861: --- tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test URL: https://github.com/apache/flink/pull/6478#discussion_r207438658 ## File path: flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.io.PrintStream; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test program for the {@link StreamingFileSink}. + * + * Uses a source that steadily emits a deterministic set of records over 60 seconds, + * after which it idles and waits for job cancellation. Every record has a unique index that is + * written to the file. + * + * The sink rolls on each checkpoint, with each part file containing a sequence of integers. + * Adding all committed part files together, and numerically sorting the contents, should + * result in a complete sequence from 0 (inclusive) to 6 (exclusive). + */ +public enum StreamingFileSinkProgram { + ; + + public static void main(final String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + final String outputPath = params.getRequired("outputPath"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(4); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS))); + + final StreamingFileSink> sink = StreamingFileSink + .forRowFormat(new Path(outputPath), (Encoder>) (element, stream) -> { + PrintStream out = new PrintStream(stream); + out.println(element.f1); + }) + .withBucketer(new KeyBucketer()) + .withRollingPolicy(new OnCheckpointRollingPolicy<>()) + .build(); + + // generate data, shuffle, sink + env.addSource(new Generator(10, 10, 60)) + .keyBy(0) + .addSink(sink); + + env.execute("StreamingFileSinkProgram"); + } + + + /** +* Use first field for buckets. +*/ + public static final class KeyBucketer implements Bucketer, String> { + + private static final long serialVersionUID = 987325769970523326L; + + @Override + public String getBucketId(final Tuple2 element, final Context context) { + return String.valueOf(element.f0); + } + + @Override + public SimpleVersionedSerializer getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + }
[GitHub] tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test
tzulitai commented on a change in pull request #6478: [FLINK-9861][tests] Add StreamingFileSink E2E test URL: https://github.com/apache/flink/pull/6478#discussion_r207438658 ## File path: flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java ## @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.Encoder; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.functions.sink.filesystem.Bucketer; +import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.bucketers.SimpleVersionedStringSerializer; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.OnCheckpointRollingPolicy; +import org.apache.flink.streaming.api.functions.source.SourceFunction; + +import java.io.PrintStream; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.TimeUnit; + +/** + * Test program for the {@link StreamingFileSink}. + * + * Uses a source that steadily emits a deterministic set of records over 60 seconds, + * after which it idles and waits for job cancellation. Every record has a unique index that is + * written to the file. + * + * The sink rolls on each checkpoint, with each part file containing a sequence of integers. + * Adding all committed part files together, and numerically sorting the contents, should + * result in a complete sequence from 0 (inclusive) to 6 (exclusive). + */ +public enum StreamingFileSinkProgram { + ; + + public static void main(final String[] args) throws Exception { + final ParameterTool params = ParameterTool.fromArgs(args); + final String outputPath = params.getRequired("outputPath"); + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + env.setParallelism(4); + env.enableCheckpointing(5000L); + env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, Time.of(10L, TimeUnit.SECONDS))); + + final StreamingFileSink> sink = StreamingFileSink + .forRowFormat(new Path(outputPath), (Encoder>) (element, stream) -> { + PrintStream out = new PrintStream(stream); + out.println(element.f1); + }) + .withBucketer(new KeyBucketer()) + .withRollingPolicy(new OnCheckpointRollingPolicy<>()) + .build(); + + // generate data, shuffle, sink + env.addSource(new Generator(10, 10, 60)) + .keyBy(0) + .addSink(sink); + + env.execute("StreamingFileSinkProgram"); + } + + + /** +* Use first field for buckets. +*/ + public static final class KeyBucketer implements Bucketer, String> { + + private static final long serialVersionUID = 987325769970523326L; + + @Override + public String getBucketId(final Tuple2 element, final Context context) { + return String.valueOf(element.f0); + } + + @Override + public SimpleVersionedSerializer getSerializer() { + return SimpleVersionedStringSerializer.INSTANCE; + } + } + + /** +* Data-generating source function. +*/ + public static final class Generator implements SourceFunction>, ListCheckpointed> { + + private static final long serialVersionUID =
[jira] [Commented] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567813#comment-16567813 ] Xingcan Cui commented on FLINK-9977: Hi [~twalthr] and [~fhueske], I've uploaded some screenshots of the new built-in function page. > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: Java.jpg, SQL.jpg, Scala.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10039) FlinkKafkaProducer - Serializer Error
Akshay Nagpal created FLINK-10039: - Summary: FlinkKafkaProducer - Serializer Error Key: FLINK-10039 URL: https://issues.apache.org/jira/browse/FLINK-10039 Project: Flink Issue Type: Bug Components: Streaming Connectors Affects Versions: 1.4.2 Reporter: Akshay Nagpal I am working on a use case where I input the data using Kafka's console producer, read the same data in my program using FlinkKafkaConsumer and write it back to another Kafka topic using FlinkKafkaProducer. I am using 1.4.2 version of the following dependencies: flink-java flink-streaming-java_2.11 flink-connector-kafka-0.10_2.11 The codes are as follows: KafkaConsoleProducer: {code:java} ./bin/kafka-console-producer --broker-list xxx:9092 --topic test1 --property "parse.key=true" --property "key.separator=:" --key-serializer org.apache.kafka.common.serialization.StringSerializer --value-serializer org.apache.kafka.common.serialization.StringSerializer {code} KafkaFlinkConsumer: {code:java} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("zookeeper.connect", "xxx:2181"); properties.setProperty("group.id", "test"); properties.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); FlinkKafkaConsumer010 myConsumer = new FlinkKafkaConsumer010("test1", new SimpleStringSchema(), properties); DataStream stream = env.addSource(myConsumer); {code} KafkaFlinkProducer: {code:java} Properties properties = new Properties(); properties.setProperty("bootstrap.servers", "xxx:9092"); properties.setProperty("zookeeper.connect", "xxx:2181"); properties.setProperty("group.id", "test"); properties.setProperty("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); properties1.setProperty("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); FlinkKafkaProducer010 myProducer = new FlinkKafkaProducer010("my-topic", new SimpleStringSchema(), properties); stream.addSink(myProducer); {code} When I specify key and value serializer as StringSerializer in FlinkKafkaProducer, it gives me the following error in the logs: {code:java} org.apache.kafka.common.errors.SerializationException: Can't convert value of class [B to class org.apache.kafka.common.serialization.StringSerializer specified in value.serializer {code} Though it's giving me this error, it's still producing the data in the topic. When I am using ByteArraySerializer though with the producer, it is not giving me the error in the logs. It is also giving me the output. Moreover, DataStream's print method is not printing the data on the console. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: Scala.jpg > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: Java.jpg, SQL.jpg, Scala.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: Java.jpg > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: Java.jpg, SQL.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: SQL.jpg > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: SQL.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: (was: SQL.jpg) > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9875) Add concurrent creation of execution job vertex
[ https://issues.apache.org/jira/browse/FLINK-9875?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567803#comment-16567803 ] ASF GitHub Bot commented on FLINK-9875: --- TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex URL: https://github.com/apache/flink/pull/6353#issuecomment-410149936 Further discussion about parallelizing the creation of InputSplit goes to [FLINK-10038](https://issues.apache.org/jira/browse/FLINK-10038) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add concurrent creation of execution job vertex > --- > > Key: FLINK-9875 > URL: https://issues.apache.org/jira/browse/FLINK-9875 > Project: Flink > Issue Type: Improvement > Components: Local Runtime >Affects Versions: 1.5.1 >Reporter: 陈梓立 >Assignee: 陈梓立 >Priority: Major > Labels: pull-request-available > > in some case like inputformat vertex, creation of execution job vertex is time > consuming, this pr add concurrent creation of execution job vertex to > accelerate it. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: (was: Java.jpg) > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: SQL.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: (was: Scala.jpg) > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: SQL.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex
TisonKun commented on issue #6353: [FLINK-9875][runtime] Add concurrent creation of execution job vertex URL: https://github.com/apache/flink/pull/6353#issuecomment-410149936 Further discussion about parallelizing the creation of InputSplit goes to [FLINK-10038](https://issues.apache.org/jira/browse/FLINK-10038) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9977) Refine the docs for Table/SQL built-in functions
[ https://issues.apache.org/jira/browse/FLINK-9977?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Xingcan Cui updated FLINK-9977: --- Attachment: SQL.jpg Scala.jpg Java.jpg > Refine the docs for Table/SQL built-in functions > > > Key: FLINK-9977 > URL: https://issues.apache.org/jira/browse/FLINK-9977 > Project: Flink > Issue Type: Improvement > Components: Documentation >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > Attachments: Java.jpg, SQL.jpg, Scala.jpg > > > There exist some syntax errors or inconsistencies in documents and Scala docs > of the Table/SQL built-in functions. This issue aims to make some > improvements to them. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] HuangZhenQiu opened a new pull request #6483: Add parquet input format
HuangZhenQiu opened a new pull request #6483: Add parquet input format URL: https://github.com/apache/flink/pull/6483 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary
[ https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 陈梓立 updated FLINK-10038: Labels: improvement inputformat parallel perfomance (was: ) > Parallel the creation of InputSplit if necessary > > > Key: FLINK-10038 > URL: https://issues.apache.org/jira/browse/FLINK-10038 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Priority: Major > Labels: improvement, inputformat, parallel, perfomance > > As a continue to the discussion in the PR about parallelize the creation of > ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. > [~StephanEwen] suggested that we could parallelize the creation of > InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary
[ https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 陈梓立 updated FLINK-10038: Component/s: (was: Build System) > Parallel the creation of InputSplit if necessary > > > Key: FLINK-10038 > URL: https://issues.apache.org/jira/browse/FLINK-10038 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Priority: Major > > As a continue to the discussion in the PR about parallelize the creation of > ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. > [~StephanEwen] suggested that we could parallelize the creation of > InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10038) Parallel the creation of InputSplit if necessary
[ https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] 陈梓立 updated FLINK-10038: Component/s: Build System > Parallel the creation of InputSplit if necessary > > > Key: FLINK-10038 > URL: https://issues.apache.org/jira/browse/FLINK-10038 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Priority: Major > > As a continue to the discussion in the PR about parallelize the creation of > ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. > [~StephanEwen] suggested that we could parallelize the creation of > InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10038) Parallel the creation of InputSplit if necessary
[ https://issues.apache.org/jira/browse/FLINK-10038?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567780#comment-16567780 ] 陈梓立 commented on FLINK-10038: - After taking a look of InputSplit and InputFormat, I find it that the interface for the creation of input splits is InputSplitSource#createInputSplits, whose implementations varies from FileInputFormat to JDBCInputFormat and so on. Since we need to decide how to create input split in a specific input source, the parallelize logic is various inside the implementation, so implement the parallelize logic case by case if possible and necessary. What about you guys' opinions? Are there other interfaces we need for the creation of input splits? What is the most elegant and effective way to do this parallelize and gain benefits from it you think? Looking forward to your comments. > Parallel the creation of InputSplit if necessary > > > Key: FLINK-10038 > URL: https://issues.apache.org/jira/browse/FLINK-10038 > Project: Flink > Issue Type: Improvement >Affects Versions: 1.5.0 >Reporter: 陈梓立 >Priority: Major > > As a continue to the discussion in the PR about parallelize the creation of > ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. > [~StephanEwen] suggested that we could parallelize the creation of > InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10038) Parallel the creation of InputSplit if necessary
陈梓立 created FLINK-10038: --- Summary: Parallel the creation of InputSplit if necessary Key: FLINK-10038 URL: https://issues.apache.org/jira/browse/FLINK-10038 Project: Flink Issue Type: Improvement Affects Versions: 1.5.0 Reporter: 陈梓立 As a continue to the discussion in the PR about parallelize the creation of ExecutionJobVertex [here|https://github.com/apache/flink/pull/6353]. [~StephanEwen] suggested that we could parallelize the creation of InputSplit, from which we gain performance improvements. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector
[ https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567778#comment-16567778 ] ASF GitHub Bot commented on FLINK-9899: --- glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433 @zentol @tzulitai Sorry for the delay. Rebased the branch with latest master and added documentation for the metrics. Let me know if that looks alright! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add more metrics to the Kinesis source connector > > > Key: FLINK-9899 > URL: https://issues.apache.org/jira/browse/FLINK-9899 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.2, 1.5.1 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: pull-request-available > > Currently there are sparse metrics available for the Kinesis Connector. Using > the > [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java] > add more stats. For example: > - sleepTimeMillis > - maxNumberOfRecordsPerFetch > - numberOfAggregatedRecordsPerFetch > - numberOfDeaggregatedRecordsPerFetch > - bytesPerFetch > - averageRecordSizeBytes > - runLoopTimeNanos > - loopFrequencyHz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer
glaksh100 edited a comment on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433 @zentol @tzulitai Sorry for the delay. Rebased the branch with latest master and added documentation for the metrics. Let me know if that looks alright! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer
glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433 @tzulitai Sorry for the delay. Rebased the branch with latest master and added documentation for the metrics. Let me know if that looks alright! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9899) Add more metrics to the Kinesis source connector
[ https://issues.apache.org/jira/browse/FLINK-9899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1656#comment-1656 ] ASF GitHub Bot commented on FLINK-9899: --- glaksh100 commented on issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-shard metrics to ShardConsumer URL: https://github.com/apache/flink/pull/6409#issuecomment-410145433 @tzulitai Sorry for the delay. Rebased the branch with latest master and added documentation for the metrics. Let me know if that looks alright! This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add more metrics to the Kinesis source connector > > > Key: FLINK-9899 > URL: https://issues.apache.org/jira/browse/FLINK-9899 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Affects Versions: 1.4.2, 1.5.1 >Reporter: Lakshmi Rao >Assignee: Lakshmi Rao >Priority: Major > Labels: pull-request-available > > Currently there are sparse metrics available for the Kinesis Connector. Using > the > [ShardMetricsReporter|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/metrics/ShardMetricsReporter.java] > add more stats. For example: > - sleepTimeMillis > - maxNumberOfRecordsPerFetch > - numberOfAggregatedRecordsPerFetch > - numberOfDeaggregatedRecordsPerFetch > - bytesPerFetch > - averageRecordSizeBytes > - runLoopTimeNanos > - loopFrequencyHz -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location
[ https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567769#comment-16567769 ] ASF GitHub Bot commented on FLINK-10037: TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410143882 cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document details event time behavior in a single location > - > > Key: FLINK-10037 > URL: https://issues.apache.org/jira/browse/FLINK-10037 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.2 >Reporter: Elias Levy >Assignee: Elias Levy >Priority: Minor > Labels: pull-request-available > > A description of event time and watermarks, how they generated, assigned, and > handled, is spread across many pages in the documentation. I would be useful > to have it all in a single place and includes missing information, such as > how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page
TisonKun commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410143882 cc @zentol This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10037) Document details event time behavior in a single location
[ https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10037: --- Labels: pull-request-available (was: ) > Document details event time behavior in a single location > - > > Key: FLINK-10037 > URL: https://issues.apache.org/jira/browse/FLINK-10037 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.2 >Reporter: Elias Levy >Assignee: Elias Levy >Priority: Minor > Labels: pull-request-available > > A description of event time and watermarks, how they generated, assigned, and > handled, is spread across many pages in the documentation. I would be useful > to have it all in a single place and includes missing information, such as > how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10037) Document details event time behavior in a single location
[ https://issues.apache.org/jira/browse/FLINK-10037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567726#comment-16567726 ] ASF GitHub Bot commented on FLINK-10037: eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410135944 Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document details event time behavior in a single location > - > > Key: FLINK-10037 > URL: https://issues.apache.org/jira/browse/FLINK-10037 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.5.2 >Reporter: Elias Levy >Assignee: Elias Levy >Priority: Minor > Labels: pull-request-available > > A description of event time and watermarks, how they generated, assigned, and > handled, is spread across many pages in the documentation. I would be useful > to have it all in a single place and includes missing information, such as > how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page
eliaslevy commented on issue #6481: [FLINK-10037] [Documentation] Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410135944 Done. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Comment Edited] (FLINK-10036) Flink's CSV output format is not consistent with the standard.
[ https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721 ] buptljy edited comment on FLINK-10036 at 8/3/18 3:39 AM: - This is very similar to this [JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a PR for that issue these days. You can discuss with me if you have any questions. was (Author: wind_ljy): This is very similar to this [JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a PR these days. You can discuss with me if you have any questions. > Flink's CSV output format is not consistent with the standard. > -- > > Key: FLINK-10036 > URL: https://issues.apache.org/jira/browse/FLINK-10036 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Caizhi Weng >Priority: Minor > > h2. What's the problem > Flink's CSV output format is not consistent with the standard > ([https://tools.ietf.org/html/rfc4180]). > In CSV format file, if a field contains comma, quotes or new line, this field > should be surrounded with quotes (see section 2.6 in the standard). > Specifically, if a field contains quotes, the quotes should be escaped by > double quotes (see section 2.7 in the standard). > For example, to express these two fields in a CSV file: > {noformat} > Hello,World > "Quoted" "String" > {noformat} > The CSV file should look like this: > {noformat} > "Hello,World","""Quoted"" ""String""" > {noformat} > But if we run the following Flink code to output these fields > {code:java} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > val data = List( > ("Hello,World", "\"Quoted\" \"String\"") > ) > val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) > ds.select('a, 'b) > val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) > ds.writeToSink(sink) > env.execute() > {code} > We get the following CSV: > {noformat} > Hello,World,"Quoted" "String" > {noformat} > which is not correct (there are actually 3 fields instead of 2 in this CSV > file, and the last field is not valid). > h2. How am I going to fix it > I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java > module, and add some test cases to ensure that my fix is correct. > h2. What's affected > This fix will change the output of CsvTableSink, and will affect the test > cases whose results are written to a CSV file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10037) Document details event time behavior in a single location
Elias Levy created FLINK-10037: -- Summary: Document details event time behavior in a single location Key: FLINK-10037 URL: https://issues.apache.org/jira/browse/FLINK-10037 Project: Flink Issue Type: Bug Components: Documentation Affects Versions: 1.5.2 Reporter: Elias Levy Assignee: Elias Levy A description of event time and watermarks, how they generated, assigned, and handled, is spread across many pages in the documentation. I would be useful to have it all in a single place and includes missing information, such as how Flink assigns timestamps to new records generated by operators. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10036) Flink's CSV output format is not consistent with the standard.
[ https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567721#comment-16567721 ] buptljy commented on FLINK-10036: - This is very similar to this [JIRA](https://issues.apache.org/jira/browse/FLINK-9964) , and I will submit a PR these days. You can discuss with me if you have any questions. > Flink's CSV output format is not consistent with the standard. > -- > > Key: FLINK-10036 > URL: https://issues.apache.org/jira/browse/FLINK-10036 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Caizhi Weng >Priority: Minor > > h2. What's the problem > Flink's CSV output format is not consistent with the standard > ([https://tools.ietf.org/html/rfc4180]). > In CSV format file, if a field contains comma, quotes or new line, this field > should be surrounded with quotes (see section 2.6 in the standard). > Specifically, if a field contains quotes, the quotes should be escaped by > double quotes (see section 2.7 in the standard). > For example, to express these two fields in a CSV file: > {noformat} > Hello,World > "Quoted" "String" > {noformat} > The CSV file should look like this: > {noformat} > "Hello,World","""Quoted"" ""String""" > {noformat} > But if we run the following Flink code to output these fields > {code:java} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > val data = List( > ("Hello,World", "\"Quoted\" \"String\"") > ) > val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) > ds.select('a, 'b) > val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) > ds.writeToSink(sink) > env.execute() > {code} > We get the following CSV: > {noformat} > Hello,World,"Quoted" "String" > {noformat} > which is not correct (there are actually 3 fields instead of 2 in this CSV > file, and the last field is not valid). > h2. How am I going to fix it > I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java > module, and add some test cases to ensure that my fix is correct. > h2. What's affected > This fix will change the output of CsvTableSink, and will affect the test > cases whose results are written to a CSV file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10036) Flink's CSV output format is not consistent with the standard.
[ https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng updated FLINK-10036: Description: h2. What's the problem Flink's CSV output format is not consistent with the standard ([https://tools.ietf.org/html/rfc4180]). In CSV format file, if a field contains comma, quotes or new line, this field should be surrounded with quotes (see section 2.6 in the standard). Specifically, if a field contains quotes, the quotes should be escaped by double quotes (see section 2.7 in the standard). For example, to express these two fields in a CSV file: {noformat} Hello,World "Quoted" "String" {noformat} The CSV file should look like this: {noformat} "Hello,World","""Quoted"" ""String""" {noformat} But if we run the following Flink code to output these fields {code:java} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = List( ("Hello,World", "\"Quoted\" \"String\"") ) val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) ds.select('a, 'b) val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) ds.writeToSink(sink) env.execute() {code} We get the following CSV: {noformat} Hello,World,"Quoted" "String" {noformat} which is not correct (there are actually 3 fields instead of 2 in this CSV file, and the last field is not valid). h2. How am I going to fix it I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java module, and add some test cases to ensure that my fix is correct. h2. What's affected This fix will change the output of CsvTableSink, and will affect the test cases whose results are written to a CSV file. was: h2. What's the problem Flink's CSV output format is not consistent with the standard ([https://tools.ietf.org/html/rfc4180]). In CSV format file, if a field contains comma, quotes or new line, this field should be surrounded with quotes (see section 2.6 in the standard). Specifically, if a field contains quotes, the quotes should be escaped by double quotes (see section 2.7 in the standard). For example, to express these two fields in a CSV file: {noformat} Hello,World "Quoted" "String" {noformat} The CSV file should look like this: {noformat} "Hello,World","""Quoted"" ""String""" {noformat} But if we run the following Flink code to output these fields {code} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = List( ("Hello,World", "\"Quoted\" \"String\"") ) val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) ds.select('a, 'b) val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) ds.writeToSink(sink) env.execute() {code} We get the following CSV: {noformat} Hello,World,"Quoted" "String" {noformat} which is not correct (there are actually 3 fields instead of 2 in this CSV file, and the last field is not valid). h2. How am I going to fix it I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java module, and add some test cases to ensure that my fix is correct. h2. What's affected This fix will change the output of CsvTableSink, and will affect some test cases currently in the project. > Flink's CSV output format is not consistent with the standard. > -- > > Key: FLINK-10036 > URL: https://issues.apache.org/jira/browse/FLINK-10036 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Caizhi Weng >Priority: Minor > > h2. What's the problem > Flink's CSV output format is not consistent with the standard > ([https://tools.ietf.org/html/rfc4180]). > In CSV format file, if a field contains comma, quotes or new line, this field > should be surrounded with quotes (see section 2.6 in the standard). > Specifically, if a field contains quotes, the quotes should be escaped by > double quotes (see section 2.7 in the standard). > For example, to express these two fields in a CSV file: > {noformat} > Hello,World > "Quoted" "String" > {noformat} > The CSV file should look like this: > {noformat} > "Hello,World","""Quoted"" ""String""" > {noformat} > But if we run the following Flink code to output these fields > {code:java} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > val data = List( > ("Hello,World", "\"Quoted\" \"String\"") > ) > val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) > ds.select('a, 'b) > val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) > ds.writeToSink(sink) > env.execute() > {code} > We get the following CSV: > {noformat} > Hello,World,"Quoted" "String" > {noformat} > which is not correct (there are actually 3 fields instead of 2 in this CSV >
[GitHub] TisonKun commented on issue #6481: Add Event Time Details documentation page
TisonKun commented on issue #6481: Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410131208 FYI, the travis-ci failure is irrelevant about this PR. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] TisonKun commented on issue #6481: Add Event Time Details documentation page
TisonKun commented on issue #6481: Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481#issuecomment-410131107 Hi @eliaslevy , thanks for you PR! It looks this PR is far more than a trivial work. Please take a look at the PULL_REQUEST_TEMPLATE as your comment above. I think raise a JIRA issue assign to this is one step of the formal process to make this PR merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount
[ https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567700#comment-16567700 ] ASF GitHub Bot commented on FLINK-9969: --- TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207428623 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ## @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager memoryManager, List TypeSerializerFactory serializerFactory, TypeComparator comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, - boolean objectReuseEnabled) - throws IOException - { + boolean objectReuseEnabled) throws IOException { + this ( + memoryManager, + memory, + ioManager, + input, + parentTask, + serializerFactory, + comparator, + numSortBuffers, + maxNumFileHandles, + startSpillingFraction, + noSpillingMemory, + handleLargeRecords, + objectReuseEnabled, + new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING)); + } + + protected UnilateralSortMerger( Review comment: why? we might config which `InMemorySorterFactory` to use if needed. semantically it's no need to be `@VisibleForTesting` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unreasonable memory requirements to complete examples/batch/WordCount > - > > Key: FLINK-9969 > URL: https://issues.apache.org/jira/browse/FLINK-9969 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0 >Reporter: Piotr Nowojski >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > Attachments: yarn_logs > > > setup on AWS EMR: > * 5 worker nodes (m4.4xlarge nodes) > * 1 master node (m4.large) > following command fails with out of memory errors: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 > examples/batch/WordCount.jar{noformat} > Only increasing memory over 17.2GB example completes. At the same time after > disabling flip6 following command succeeds: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 > examples/batch/WordCount.jar{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207428623 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ## @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager memoryManager, List TypeSerializerFactory serializerFactory, TypeComparator comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, - boolean objectReuseEnabled) - throws IOException - { + boolean objectReuseEnabled) throws IOException { + this ( + memoryManager, + memory, + ioManager, + input, + parentTask, + serializerFactory, + comparator, + numSortBuffers, + maxNumFileHandles, + startSpillingFraction, + noSpillingMemory, + handleLargeRecords, + objectReuseEnabled, + new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING)); + } + + protected UnilateralSortMerger( Review comment: why? we might config which `InMemorySorterFactory` to use if needed. semantically it's no need to be `@VisibleForTesting` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount
[ https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567692#comment-16567692 ] ASF GitHub Bot commented on FLINK-9969: --- TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207425876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ## @@ -47,9 +37,20 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TestLogger; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.NoSuchElementException; Review comment: This change can be suppressed if merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unreasonable memory requirements to complete examples/batch/WordCount > - > > Key: FLINK-9969 > URL: https://issues.apache.org/jira/browse/FLINK-9969 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0 >Reporter: Piotr Nowojski >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > Attachments: yarn_logs > > > setup on AWS EMR: > * 5 worker nodes (m4.4xlarge nodes) > * 1 master node (m4.large) > following command fails with out of memory errors: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 > examples/batch/WordCount.jar{noformat} > Only increasing memory over 17.2GB example completes. At the same time after > disabling flip6 following command succeeds: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 > examples/batch/WordCount.jar{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207425876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ## @@ -47,9 +37,20 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TestLogger; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.NoSuchElementException; Review comment: This change can be suppressed if merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount
[ https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567691#comment-16567691 ] ASF GitHub Bot commented on FLINK-9969: --- TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207425876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ## @@ -47,9 +37,20 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TestLogger; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.NoSuchElementException; Review comment: The change can be suppressed if merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unreasonable memory requirements to complete examples/batch/WordCount > - > > Key: FLINK-9969 > URL: https://issues.apache.org/jira/browse/FLINK-9969 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0 >Reporter: Piotr Nowojski >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > Attachments: yarn_logs > > > setup on AWS EMR: > * 5 worker nodes (m4.4xlarge nodes) > * 1 master node (m4.large) > following command fails with out of memory errors: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 > examples/batch/WordCount.jar{noformat} > Only increasing memory over 17.2GB example completes. At the same time after > disabling flip6 following command succeeds: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 > examples/batch/WordCount.jar{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
TisonKun commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207425876 ## File path: flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/CombiningUnilateralSortMergerITCase.java ## @@ -47,9 +37,20 @@ import org.apache.flink.runtime.util.ReusingKeyGroupedIterator; import org.apache.flink.util.Collector; import org.apache.flink.util.MutableObjectIterator; +import org.apache.flink.util.TestLogger; + import org.junit.After; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Hashtable; +import java.util.Iterator; +import java.util.NoSuchElementException; Review comment: The change can be suppressed if merged. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10033) Let Task release reference to Invokable on shutdown
[ https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567687#comment-16567687 ] ASF GitHub Bot commented on FLINK-10033: TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable URL: https://github.com/apache/flink/pull/6480#issuecomment-410124471 nice catch! memory leaks is one of the most terrible issues, this is helpful. +1 for merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Let Task release reference to Invokable on shutdown > --- > > Key: FLINK-10033 > URL: https://issues.apache.org/jira/browse/FLINK-10033 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > References to Task objects may under some conditions linger longer than for > the lifetime of the task. For example, in case of local network channels, the > receiving task may have a reference to the object of the task that produced > the data. > To prevent against memory leaks, the Task needs to release all references to > its AbstractInvokable when it shuts down or cancels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable
TisonKun commented on issue #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable URL: https://github.com/apache/flink/pull/6480#issuecomment-410124471 nice catch! memory leaks is one of the most terrible issues, this is helpful. +1 for merge This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-10036) Flink's CSV output format is not consistent with the standard.
[ https://issues.apache.org/jira/browse/FLINK-10036?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Caizhi Weng updated FLINK-10036: Description: h2. What's the problem Flink's CSV output format is not consistent with the standard ([https://tools.ietf.org/html/rfc4180]). In CSV format file, if a field contains comma, quotes or new line, this field should be surrounded with quotes (see section 2.6 in the standard). Specifically, if a field contains quotes, the quotes should be escaped by double quotes (see section 2.7 in the standard). For example, to express these two fields in a CSV file: {noformat} Hello,World "Quoted" "String" {noformat} The CSV file should look like this: {noformat} "Hello,World","""Quoted"" ""String""" {noformat} But if we run the following Flink code to output these fields {code} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = List( ("Hello,World", "\"Quoted\" \"String\"") ) val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) ds.select('a, 'b) val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) ds.writeToSink(sink) env.execute() {code} We get the following CSV: {noformat} Hello,World,"Quoted" "String" {noformat} which is not correct (there are actually 3 fields instead of 2 in this CSV file, and the last field is not valid). h2. How am I going to fix it I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java module, and add some test cases to ensure that my fix is correct. h2. What's affected This fix will change the output of CsvTableSink, and will affect some test cases currently in the project. was: h2. What's the problem Flink's CSV output format is not consistent with the standard (https://tools.ietf.org/html/rfc4180). In CSV format file, if a field contains comma, quotes or new line, this field should be surrounded with quotes (see section 2.6 in the standard). Specifically, if a field contains quotes, the quotes should be escaped by double quotes (see section 2.7 in the standard). For example, to express these two fields in a CSV file: {noformat} Hello,World "Quoted" "String" {noformat} The CSV file should look like this: {noformat} "Hello,World","""Quoted"" ""String""" {noformat} But if we run the following Flink code to output these fields {code:scala} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = List( ("Hello,World", "\"Quoted\" \"String\"") ) val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) ds.select('a, 'b) val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) ds.writeToSink(sink) env.execute() {code} We get the following CSV: {noformat} Hello,World,"Quoted" "String" {noformat} which is not correct (there are actually 3 fields instead of 2 in this CSV file, and the last field is not valid). h2. How am I going to fix it I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java module, and add some test cases to ensure that my fix is correct. But this fix will change the output of CsvTableSink, and will affect about 50 test cases currently in the project. > Flink's CSV output format is not consistent with the standard. > -- > > Key: FLINK-10036 > URL: https://issues.apache.org/jira/browse/FLINK-10036 > Project: Flink > Issue Type: Bug > Components: Table API SQL >Reporter: Caizhi Weng >Priority: Minor > > h2. What's the problem > Flink's CSV output format is not consistent with the standard > ([https://tools.ietf.org/html/rfc4180]). > In CSV format file, if a field contains comma, quotes or new line, this field > should be surrounded with quotes (see section 2.6 in the standard). > Specifically, if a field contains quotes, the quotes should be escaped by > double quotes (see section 2.7 in the standard). > For example, to express these two fields in a CSV file: > {noformat} > Hello,World > "Quoted" "String" > {noformat} > The CSV file should look like this: > {noformat} > "Hello,World","""Quoted"" ""String""" > {noformat} > But if we run the following Flink code to output these fields > {code} > val env = ExecutionEnvironment.getExecutionEnvironment > val tEnv = TableEnvironment.getTableEnvironment(env, config) > val data = List( > ("Hello,World", "\"Quoted\" \"String\"") > ) > val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) > ds.select('a, 'b) > val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) > ds.writeToSink(sink) > env.execute() > {code} > We get the following CSV: > {noformat} > Hello,World,"Quoted" "String" > {noformat} > which is not correct (there are actually 3 fields instead of 2 in this CSV > file, and the last field
[jira] [Created] (FLINK-10036) Flink's CSV output format is not consistent with the standard.
Caizhi Weng created FLINK-10036: --- Summary: Flink's CSV output format is not consistent with the standard. Key: FLINK-10036 URL: https://issues.apache.org/jira/browse/FLINK-10036 Project: Flink Issue Type: Bug Components: Table API SQL Reporter: Caizhi Weng h2. What's the problem Flink's CSV output format is not consistent with the standard (https://tools.ietf.org/html/rfc4180). In CSV format file, if a field contains comma, quotes or new line, this field should be surrounded with quotes (see section 2.6 in the standard). Specifically, if a field contains quotes, the quotes should be escaped by double quotes (see section 2.7 in the standard). For example, to express these two fields in a CSV file: {noformat} Hello,World "Quoted" "String" {noformat} The CSV file should look like this: {noformat} "Hello,World","""Quoted"" ""String""" {noformat} But if we run the following Flink code to output these fields {code:scala} val env = ExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env, config) val data = List( ("Hello,World", "\"Quoted\" \"String\"") ) val ds = env.fromCollection(data).toTable(tEnv).as('a, 'b) ds.select('a, 'b) val sink = new CsvTableSink("test.csv", ",", 1, WriteMode.OVERWRITE) ds.writeToSink(sink) env.execute() {code} We get the following CSV: {noformat} Hello,World,"Quoted" "String" {noformat} which is not correct (there are actually 3 fields instead of 2 in this CSV file, and the last field is not valid). h2. How am I going to fix it I'm going to fix the writeRecord method in CsvOutputFormat.java in flink-java module, and add some test cases to ensure that my fix is correct. But this fix will change the output of CsvTableSink, and will affect about 50 test cases currently in the project. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567666#comment-16567666 ] Rong Rong edited comment on FLINK-5315 at 8/3/18 1:25 AM: -- [~hequn8128], [~fhueske] I am actually close to finish the implementation and found out that it might be better to use {code:scala} myudagg('a, 'b).distinct {code} instead of {code:scala} myudagg.distinct('a, 'b) {code} Since similar modifier is added at the end of the table distinct: {code:scala} table.select('a, 'b).distinct {code} It might be great to make the modifier location consistent. What do you guys think? was (Author: walterddr): [~hequn8128], [~fhueske] I am actually close to finish the implementation and found out that it might be better to use {code:scala} myudagg('a, 'b).distinct {code} instead of {code:scala} myudagg.distinct('a, 'b) {code} Since similar modifier is added at the end of the table distinct: `table.select('a, 'b).distinct`. It might be great to make the modifier location consistent. What do you guys think? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-5315) Support distinct aggregations in table api
[ https://issues.apache.org/jira/browse/FLINK-5315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567666#comment-16567666 ] Rong Rong commented on FLINK-5315: -- [~hequn8128], [~fhueske] I am actually close to finish the implementation and found out that it might be better to use {code:scala} myudagg('a, 'b).distinct {code} instead of {code:scala} myudagg.distinct('a, 'b) {code} Since similar modifier is added at the end of the table distinct: `table.select('a, 'b).distinct`. It might be great to make the modifier location consistent. What do you guys think? > Support distinct aggregations in table api > -- > > Key: FLINK-5315 > URL: https://issues.apache.org/jira/browse/FLINK-5315 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: Kurt Young >Assignee: Rong Rong >Priority: Major > > Support distinct aggregations in Table API in the following format: > For Expressions: > {code:scala} > 'a.count.distinct // Expressions distinct modifier > {code} > For User-defined Function: > {code:scala} > singleArgUdaggFunc.distinct('a) // FunctionCall distinct modifier > multiArgUdaggFunc.distinct('a, 'b) // FunctionCall distinct modifier > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards.
tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. URL: https://github.com/apache/flink/pull/6482 This change fixes the retry behavior of listShards to match what getRecords already supports. Importantly this will prevent the subtask from failing on transient listShards errors that we can identify based on well known exceptions. These are recoverable and should not lead to unnecessary recovery cycles that cause downtime. R: @glaksh100 @jgrier @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions
[ https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567647#comment-16567647 ] ASF GitHub Bot commented on FLINK-10020: tweise opened a new pull request #6482: [FLINK-10020] [kinesis] Support recoverable exceptions in listShards. URL: https://github.com/apache/flink/pull/6482 This change fixes the retry behavior of listShards to match what getRecords already supports. Importantly this will prevent the subtask from failing on transient listShards errors that we can identify based on well known exceptions. These are recoverable and should not lead to unnecessary recovery cycles that cause downtime. R: @glaksh100 @jgrier @tzulitai This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kinesis Consumer listShards should support more recoverable exceptions > -- > > Key: FLINK-10020 > URL: https://issues.apache.org/jira/browse/FLINK-10020 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > Currently transient errors in listShards make the consumer fail and cause the > entire job to reset. That is unnecessary for certain exceptions (like status > 503 errors). It should be possible to control the exceptions that qualify for > retry, similar to getRecords/isRecoverableSdkClientException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10020) Kinesis Consumer listShards should support more recoverable exceptions
[ https://issues.apache.org/jira/browse/FLINK-10020?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10020: --- Labels: pull-request-available (was: ) > Kinesis Consumer listShards should support more recoverable exceptions > -- > > Key: FLINK-10020 > URL: https://issues.apache.org/jira/browse/FLINK-10020 > Project: Flink > Issue Type: Improvement > Components: Kinesis Connector >Reporter: Thomas Weise >Assignee: Thomas Weise >Priority: Major > Labels: pull-request-available > > Currently transient errors in listShards make the consumer fail and cause the > entire job to reset. That is unnecessary for certain exceptions (like status > 503 errors). It should be possible to control the exceptions that qualify for > retry, similar to getRecords/isRecoverableSdkClientException. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] eliaslevy opened a new pull request #6481: Add Event Time Details documentation page
eliaslevy opened a new pull request #6481: Add Event Time Details documentation page URL: https://github.com/apache/flink/pull/6481 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at:
[jira] [Commented] (FLINK-7243) Add ParquetInputFormat
[ https://issues.apache.org/jira/browse/FLINK-7243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567546#comment-16567546 ] Zhenqiu Huang commented on FLINK-7243: -- [~najman] [~nssalian] I created a diff for the task. After polishing tonight, I will create a pull request. https://github.com/apache/flink/compare/master...HuangZhenQiu:add-parquet-input-format > Add ParquetInputFormat > -- > > Key: FLINK-7243 > URL: https://issues.apache.org/jira/browse/FLINK-7243 > Project: Flink > Issue Type: Sub-task > Components: Table API SQL >Reporter: godfrey he >Assignee: Zhenqiu Huang >Priority: Major > > Add a {{ParquetInputFormat}} to read data from a Apache Parquet file. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (FLINK-7642) Upgrade maven surefire plugin to 2.21.0
[ https://issues.apache.org/jira/browse/FLINK-7642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16433258#comment-16433258 ] Ted Yu edited comment on FLINK-7642 at 8/2/18 8:28 PM: --- SUREFIRE-1439 is in 2.21.0 which is needed for compiling with Java 10 . was (Author: yuzhih...@gmail.com): SUREFIRE-1439 is in 2.21.0 which is needed for compiling with Java 10 > Upgrade maven surefire plugin to 2.21.0 > --- > > Key: FLINK-7642 > URL: https://issues.apache.org/jira/browse/FLINK-7642 > Project: Flink > Issue Type: Improvement > Components: Build System >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > Surefire 2.19 release introduced more useful test filters which would let us > run a subset of the test. > This issue is for upgrading maven surefire plugin to 2.21.0 which contains > SUREFIRE-1422 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-9924) Upgrade zookeeper to 3.4.13
[ https://issues.apache.org/jira/browse/FLINK-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9924: -- Description: zookeeper 3.4.13 is being released. ZOOKEEPER-2959 fixes data loss when observer is used ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / cloud) environment was: zookeeper 3.4.13 is being released. ZOOKEEPER-2959 fixes data loss when observer is used ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container / cloud) environment > Upgrade zookeeper to 3.4.13 > --- > > Key: FLINK-9924 > URL: https://issues.apache.org/jira/browse/FLINK-9924 > Project: Flink > Issue Type: Task >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > > zookeeper 3.4.13 is being released. > ZOOKEEPER-2959 fixes data loss when observer is used > ZOOKEEPER-2184 allows ZooKeeper Java clients to work in dynamic IP (container > / cloud) > environment -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10035) ConcurrentModificationException with flink-metrics-slf4j
[ https://issues.apache.org/jira/browse/FLINK-10035?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567424#comment-16567424 ] Chesnay Schepler commented on FLINK-10035: -- This isn't really an issue, and applies to several reporters. This only happens if metrics are (un-)registered while a report is going on For long living this isn't an issue as all it does is delay a report, and for short living jobs scheduled reporters are unreliable anyway. > ConcurrentModificationException with flink-metrics-slf4j > > > Key: FLINK-10035 > URL: https://issues.apache.org/jira/browse/FLINK-10035 > Project: Flink > Issue Type: Bug > Components: Metrics >Affects Versions: 1.5.2 >Reporter: Nico Kruber >Priority: Major > > {code} > 2018-08-02 15:45:08,052 WARN > org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while > reporting metrics > java.util.ConcurrentModificationException > at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) > at java.util.HashMap$EntryIterator.next(HashMap.java:1471) > at java.util.HashMap$EntryIterator.next(HashMap.java:1469) > at > org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:95) > at > org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > https://api.travis-ci.org/v3/job/411307171/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10035) ConcurrentModificationException with flink-metrics-slf4j
Nico Kruber created FLINK-10035: --- Summary: ConcurrentModificationException with flink-metrics-slf4j Key: FLINK-10035 URL: https://issues.apache.org/jira/browse/FLINK-10035 Project: Flink Issue Type: Bug Components: Metrics Affects Versions: 1.5.2 Reporter: Nico Kruber {code} 2018-08-02 15:45:08,052 WARN org.apache.flink.runtime.metrics.MetricRegistryImpl - Error while reporting metrics java.util.ConcurrentModificationException at java.util.HashMap$HashIterator.nextNode(HashMap.java:1437) at java.util.HashMap$EntryIterator.next(HashMap.java:1471) at java.util.HashMap$EntryIterator.next(HashMap.java:1469) at org.apache.flink.metrics.slf4j.Slf4jReporter.report(Slf4jReporter.java:95) at org.apache.flink.runtime.metrics.MetricRegistryImpl$ReporterTask.run(MetricRegistryImpl.java:427) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} https://api.travis-ci.org/v3/job/411307171/log.txt -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-9950) Handle migration to Gitbox
[ https://issues.apache.org/jira/browse/FLINK-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-9950. --- Resolution: Fixed > Handle migration to Gitbox > -- > > Key: FLINK-9950 > URL: https://issues.apache.org/jira/browse/FLINK-9950 > Project: Flink > Issue Type: Improvement > Components: Build System, Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > Parent issue for all things we have to change now that Gitbox is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10034) Update website pages
[ https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-10034. Resolution: Fixed cc2041d3bf80c2fb91cedb6ec0b29ea10a9b4e77 > Update website pages > > > Key: FLINK-10034 > URL: https://issues.apache.org/jira/browse/FLINK-10034 > Project: Flink > Issue Type: Sub-task > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > Several pages of the project website still refer to the old repositories. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10034) Update website pages
[ https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10034: - Description: Several pages of the project website still refer to the old repositories. (was: The community page still refers to the old repositories.) > Update website pages > > > Key: FLINK-10034 > URL: https://issues.apache.org/jira/browse/FLINK-10034 > Project: Flink > Issue Type: Sub-task > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > Several pages of the project website still refer to the old repositories. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10034) Update website pages
[ https://issues.apache.org/jira/browse/FLINK-10034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler updated FLINK-10034: - Summary: Update website pages (was: Update community page) > Update website pages > > > Key: FLINK-10034 > URL: https://issues.apache.org/jira/browse/FLINK-10034 > Project: Flink > Issue Type: Sub-task > Components: Project Website >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > The community page still refers to the old repositories. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (FLINK-10034) Update community page
Chesnay Schepler created FLINK-10034: Summary: Update community page Key: FLINK-10034 URL: https://issues.apache.org/jira/browse/FLINK-10034 Project: Flink Issue Type: Sub-task Components: Project Website Reporter: Chesnay Schepler Assignee: Chesnay Schepler The community page still refers to the old repositories. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Reopened] (FLINK-9950) Handle migration to Gitbox
[ https://issues.apache.org/jira/browse/FLINK-9950?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler reopened FLINK-9950: - > Handle migration to Gitbox > -- > > Key: FLINK-9950 > URL: https://issues.apache.org/jira/browse/FLINK-9950 > Project: Flink > Issue Type: Improvement > Components: Build System, Documentation >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Major > > Parent issue for all things we have to change now that Gitbox is enabled. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10033) Let Task release reference to Invokable on shutdown
[ https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567385#comment-16567385 ] ASF GitHub Bot commented on FLINK-10033: StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable URL: https://github.com/apache/flink/pull/6480 ## What is the purpose of the change References to Task objects may under some conditions linger longer than for the lifetime of the task. For example, in case of local network channels, the receiving task may have a reference to the object of the task that produced the data. To guard against memory leaks, the Task releases the reference to its AbstractInvokable when it shuts down or cancels. ## Brief change log - The `Task` nulls out its heap reference to the `AbstractInvokable` - `TaskTest` verifies that for finite task execution and cancellation ## Verifying this change Self-contained in a unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Let Task release reference to Invokable on shutdown > --- > > Key: FLINK-10033 > URL: https://issues.apache.org/jira/browse/FLINK-10033 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > References to Task objects may under some conditions linger longer than for > the lifetime of the task. For example, in case of local network channels, the > receiving task may have a reference to the object of the task that produced > the data. > To prevent against memory leaks, the Task needs to release all references to > its AbstractInvokable when it shuts down or cancels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (FLINK-10033) Let Task release reference to Invokable on shutdown
[ https://issues.apache.org/jira/browse/FLINK-10033?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] ASF GitHub Bot updated FLINK-10033: --- Labels: pull-request-available (was: ) > Let Task release reference to Invokable on shutdown > --- > > Key: FLINK-10033 > URL: https://issues.apache.org/jira/browse/FLINK-10033 > Project: Flink > Issue Type: Bug > Components: TaskManager >Affects Versions: 1.5.2 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Major > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > References to Task objects may under some conditions linger longer than for > the lifetime of the task. For example, in case of local network channels, the > receiving task may have a reference to the object of the task that produced > the data. > To prevent against memory leaks, the Task needs to release all references to > its AbstractInvokable when it shuts down or cancels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable
StephanEwen opened a new pull request #6480: [FLINK-10033] [runtime] Task releases reference to AbstractInvokable URL: https://github.com/apache/flink/pull/6480 ## What is the purpose of the change References to Task objects may under some conditions linger longer than for the lifetime of the task. For example, in case of local network channels, the receiving task may have a reference to the object of the task that produced the data. To guard against memory leaks, the Task releases the reference to its AbstractInvokable when it shuts down or cancels. ## Brief change log - The `Task` nulls out its heap reference to the `AbstractInvokable` - `TaskTest` verifies that for finite task execution and cancellation ## Verifying this change Self-contained in a unit test. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **no** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **not applicable** This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Created] (FLINK-10033) Let Task release reference to Invokable on shutdown
Stephan Ewen created FLINK-10033: Summary: Let Task release reference to Invokable on shutdown Key: FLINK-10033 URL: https://issues.apache.org/jira/browse/FLINK-10033 Project: Flink Issue Type: Bug Components: TaskManager Affects Versions: 1.5.2 Reporter: Stephan Ewen Assignee: Stephan Ewen Fix For: 1.5.3, 1.6.0 References to Task objects may under some conditions linger longer than for the lifetime of the task. For example, in case of local network channels, the receiving task may have a reference to the object of the task that produced the data. To prevent against memory leaks, the Task needs to release all references to its AbstractInvokable when it shuts down or cancels. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount
[ https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567291#comment-16567291 ] ASF GitHub Bot commented on FLINK-9969: --- zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207319425 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.core.memory.MemorySegment; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Default factory for {@link InMemorySorter}. + */ +public class DefaultInMemorySorterFactory implements InMemorySorterFactory { + + @Nonnull + private final TypeSerializerFactory typeSerializerFactory; + + @Nonnull + private final TypeComparator typeComparator; + + private final boolean useFixedLengthRecordSorter; + + @Nullable + private TypeSerializer initialSerializer; + + DefaultInMemorySorterFactory( + @Nonnull TypeSerializerFactory typeSerializerFactory, + @Nonnull TypeComparator typeComparator, + int thresholdForInPlaceSorting) { + this.typeSerializerFactory = typeSerializerFactory; + this.typeComparator = typeComparator; + + this.initialSerializer = typeSerializerFactory.getSerializer(); + + this.useFixedLengthRecordSorter = typeComparator.supportsSerializationWithKeyNormalization() && + initialSerializer.getLength() > 0 && initialSerializer.getLength() <= thresholdForInPlaceSorting; + } + + @Override + public InMemorySorter create(List sortSegments) { + + final TypeSerializer typeSerializer; + + if (initialSerializer == null) { + typeSerializer = typeSerializerFactory.getSerializer(); + } else { + typeSerializer = initialSerializer; Review comment: couldn't we simplify this by always discarding the serializer created in the constructor? We could then remove the `initialSerializer` field and always create a new serializer here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unreasonable memory requirements to complete examples/batch/WordCount > - > > Key: FLINK-9969 > URL: https://issues.apache.org/jira/browse/FLINK-9969 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0 >Reporter: Piotr Nowojski >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > Attachments: yarn_logs > > > setup on AWS EMR: > * 5 worker nodes (m4.4xlarge nodes) > * 1 master node (m4.large) > following command fails with out of memory errors: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 > examples/batch/WordCount.jar{noformat} > Only increasing memory over 17.2GB example completes. At the same time after > disabling flip6 following command succeeds: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 >
[jira] [Commented] (FLINK-9969) Unreasonable memory requirements to complete examples/batch/WordCount
[ https://issues.apache.org/jira/browse/FLINK-9969?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567292#comment-16567292 ] ASF GitHub Bot commented on FLINK-9969: --- zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207319672 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ## @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager memoryManager, List TypeSerializerFactory serializerFactory, TypeComparator comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, - boolean objectReuseEnabled) - throws IOException - { + boolean objectReuseEnabled) throws IOException { + this ( + memoryManager, + memory, + ioManager, + input, + parentTask, + serializerFactory, + comparator, + numSortBuffers, + maxNumFileHandles, + startSpillingFraction, + noSpillingMemory, + handleLargeRecords, + objectReuseEnabled, + new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING)); + } + + protected UnilateralSortMerger( Review comment: `@VisibleForTesting`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Unreasonable memory requirements to complete examples/batch/WordCount > - > > Key: FLINK-9969 > URL: https://issues.apache.org/jira/browse/FLINK-9969 > Project: Flink > Issue Type: Bug > Components: ResourceManager >Affects Versions: 1.5.0, 1.5.1, 1.5.2, 1.6.0 >Reporter: Piotr Nowojski >Assignee: Till Rohrmann >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > Attachments: yarn_logs > > > setup on AWS EMR: > * 5 worker nodes (m4.4xlarge nodes) > * 1 master node (m4.large) > following command fails with out of memory errors: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 16000 > examples/batch/WordCount.jar{noformat} > Only increasing memory over 17.2GB example completes. At the same time after > disabling flip6 following command succeeds: > {noformat} > export HADOOP_CLASSPATH=`hadoop classpath` > ./bin/flink run -m yarn-cluster -p 20 -yn 5 -ys 4 -ytm 1000 > examples/batch/WordCount.jar{noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207319672 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/UnilateralSortMerger.java ## @@ -212,9 +215,39 @@ protected UnilateralSortMerger(MemoryManager memoryManager, List TypeSerializerFactory serializerFactory, TypeComparator comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords, - boolean objectReuseEnabled) - throws IOException - { + boolean objectReuseEnabled) throws IOException { + this ( + memoryManager, + memory, + ioManager, + input, + parentTask, + serializerFactory, + comparator, + numSortBuffers, + maxNumFileHandles, + startSpillingFraction, + noSpillingMemory, + handleLargeRecords, + objectReuseEnabled, + new DefaultInMemorySorterFactory<>(serializerFactory, comparator, THRESHOLD_FOR_IN_PLACE_SORTING)); + } + + protected UnilateralSortMerger( Review comment: `@VisibleForTesting`? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger
zentol commented on a change in pull request #6479: [FLINK-9969][batch] Dispose InMemorySorters created by the UnilateralSortMerger URL: https://github.com/apache/flink/pull/6479#discussion_r207319425 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/DefaultInMemorySorterFactory.java ## @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.sort; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.common.typeutils.TypeSerializerFactory; +import org.apache.flink.core.memory.MemorySegment; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.List; + +/** + * Default factory for {@link InMemorySorter}. + */ +public class DefaultInMemorySorterFactory implements InMemorySorterFactory { + + @Nonnull + private final TypeSerializerFactory typeSerializerFactory; + + @Nonnull + private final TypeComparator typeComparator; + + private final boolean useFixedLengthRecordSorter; + + @Nullable + private TypeSerializer initialSerializer; + + DefaultInMemorySorterFactory( + @Nonnull TypeSerializerFactory typeSerializerFactory, + @Nonnull TypeComparator typeComparator, + int thresholdForInPlaceSorting) { + this.typeSerializerFactory = typeSerializerFactory; + this.typeComparator = typeComparator; + + this.initialSerializer = typeSerializerFactory.getSerializer(); + + this.useFixedLengthRecordSorter = typeComparator.supportsSerializationWithKeyNormalization() && + initialSerializer.getLength() > 0 && initialSerializer.getLength() <= thresholdForInPlaceSorting; + } + + @Override + public InMemorySorter create(List sortSegments) { + + final TypeSerializer typeSerializer; + + if (initialSerializer == null) { + typeSerializer = typeSerializerFactory.getSerializer(); + } else { + typeSerializer = initialSerializer; Review comment: couldn't we simplify this by always discarding the serializer created in the constructor? We could then remove the `initialSerializer` field and always create a new serializer here. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Resolved] (FLINK-9947) Document unified table sources/sinks/formats
[ https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Timo Walther resolved FLINK-9947. - Resolution: Fixed Fix Version/s: 1.6.0 Fixed in 1.7.0: dacc16b4fa6db6abfdbf73b99f26a5fd36b12acd Fixed in 1.6.0: c858d31e7e6b404f810741ac076e66e14cb06868 > Document unified table sources/sinks/formats > > > Key: FLINK-9947 > URL: https://issues.apache.org/jira/browse/FLINK-9947 > Project: Flink > Issue Type: Improvement > Components: Documentation, Table API SQL >Reporter: Timo Walther >Assignee: Timo Walther >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > > The recent unification of table sources/sinks/formats needs documentation. I > propose a new page that explains the built-in sources, sinks, and formats as > well as a page for customization of public interfaces. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9947) Document unified table sources/sinks/formats
[ https://issues.apache.org/jira/browse/FLINK-9947?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567119#comment-16567119 ] ASF GitHub Bot commented on FLINK-9947: --- asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table sources/sinks/formats URL: https://github.com/apache/flink/pull/6456 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md new file mode 100644 index 000..fa51783a590 --- /dev/null +++ b/docs/dev/table/connect.md @@ -0,0 +1,1049 @@ +--- +title: "Connect to External Systems" +nav-parent_id: tableapi +nav-pos: 19 +--- + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +Attention If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +* This will be replaced by the TOC +{:toc} + +Dependencies + + +The following table list all available connectors and formats. Their mutual compatibility is tagged in the corresponding sections for [table connectors](connect.html#table-connectors) and [table formats](connect.html#table-formats). The following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +{% if site.is_stable %} + +### Connectors + +| Name | Version | Maven dependency | SQL Client JAR | +| : | : | :--- | :--| +| Filesystem| | Built-in | Built-in | +| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | +| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | + +### Formats + +| Name | Maven dependency | SQL Client JAR | +| : | :--- | :- | +| CSV | Built-in | Built-in | +| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | +| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | + +{% else %} + +This table is only available for stable releases. + +{% endif %} + +{% top %} + +Overview + + +Beginning from Flink 1.6, the declaration of a connection to an external system is separated from the actual implementation. + +Connections can be specified either + +- **programmatically** using a `Descriptor` under `org.apache.flink.table.descriptors` for Table & SQL API +- or **declaratively** via [YAML configuration files](http://yaml.org/) for the SQL Client. + +This allows not only for better unification of APIs and SQL Client but also for better extensibility in case of [custom implementations](sourceSinks.html) without changing the actual declaration. + +Every declaration is similar to a SQL `CREATE TABLE` statement. One can define the name of the table, the schema of the table, a connector, and a
[GitHub] asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table sources/sinks/formats
asfgit closed pull request #6456: [FLINK-9947] [docs] Document unified table sources/sinks/formats URL: https://github.com/apache/flink/pull/6456 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/dev/table/connect.md b/docs/dev/table/connect.md new file mode 100644 index 000..fa51783a590 --- /dev/null +++ b/docs/dev/table/connect.md @@ -0,0 +1,1049 @@ +--- +title: "Connect to External Systems" +nav-parent_id: tableapi +nav-pos: 19 +--- + + +Flink's Table API & SQL programs can be connected to other external systems for reading and writing both batch and streaming tables. A table source provides access to data which is stored in external systems (such as a database, key-value store, message queue, or file system). A table sink emits a table to an external storage system. Depending on the type of source and sink, they support different formats such as CSV, Parquet, or ORC. + +This page describes how to declare built-in table sources and/or table sinks and register them in Flink. After a source or sink has been registered, it can be accessed by Table API & SQL statements. + +Attention If you want to implement your own *custom* table source or sink, have a look at the [user-defined sources & sinks page](sourceSinks.html). + +* This will be replaced by the TOC +{:toc} + +Dependencies + + +The following table list all available connectors and formats. Their mutual compatibility is tagged in the corresponding sections for [table connectors](connect.html#table-connectors) and [table formats](connect.html#table-formats). The following table provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles. + +{% if site.is_stable %} + +### Connectors + +| Name | Version | Maven dependency | SQL Client JAR | +| : | : | :--- | :--| +| Filesystem| | Built-in | Built-in | +| Apache Kafka | 0.8 | `flink-connector-kafka-0.8` | Not available | +| Apache Kafka | 0.9 | `flink-connector-kafka-0.9` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.9{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.9{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.10 | `flink-connector-kafka-0.10` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.10{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.10{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | +| Apache Kafka | 0.11 | `flink-connector-kafka-0.11` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-connector-kafka-0.11{{site.scala_version_suffix}}/{{site.version}}/flink-connector-kafka-0.11{{site.scala_version_suffix}}-{{site.version}}-sql-jar.jar) | + +### Formats + +| Name | Maven dependency | SQL Client JAR | +| : | :--- | :- | +| CSV | Built-in | Built-in | +| JSON | `flink-json` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-json/{{site.version}}/flink-json-{{site.version}}-sql-jar.jar) | +| Apache Avro | `flink-avro` | [Download](http://central.maven.org/maven2/org/apache/flink/flink-avro/{{site.version}}/flink-avro-{{site.version}}-sql-jar.jar) | + +{% else %} + +This table is only available for stable releases. + +{% endif %} + +{% top %} + +Overview + + +Beginning from Flink 1.6, the declaration of a connection to an external system is separated from the actual implementation. + +Connections can be specified either + +- **programmatically** using a `Descriptor` under `org.apache.flink.table.descriptors` for Table & SQL API +- or **declaratively** via [YAML configuration files](http://yaml.org/) for the SQL Client. + +This allows not only for better unification of APIs and SQL Client but also for better extensibility in case of [custom implementations](sourceSinks.html) without changing the actual declaration. + +Every declaration is similar to a SQL `CREATE TABLE` statement. One can define the name of the table, the schema of the table, a connector, and a data format upfront for connecting to an external system. + +The **connector** describes the external system that stores the data of a table. Storage systems such as [Apacha Kafka](http://kafka.apache.org/) or a regular file system can be
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567113#comment-16567113 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207306976 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; Review comment: I think we need to check whether the supporting actors are not null because we initialize them only after the `clearStateFuture` has been completed. If this takes a bit and someone revokes our leadership in the meantime, `clearState` will be called before the support actors have been created. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207306976 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; Review comment: I think we need to check whether the supporting actors are not null because we initialize them only after the `clearStateFuture` has been completed. If this takes a bit and someone revokes our leadership in the meantime, `clearState` will be called before the support actors have been created. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Updated] (FLINK-9363) Bump up the Jackson version
[ https://issues.apache.org/jira/browse/FLINK-9363?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ted Yu updated FLINK-9363: -- Description: CVE's for Jackson: CVE-2017-17485 CVE-2018-5968 CVE-2018-7489 We can upgrade to 2.9.5 was: CVE's for Jackson: CVE-2017-17485 CVE-2018-5968 CVE-2018-7489 We can upgrade to 2.9.5 > Bump up the Jackson version > --- > > Key: FLINK-9363 > URL: https://issues.apache.org/jira/browse/FLINK-9363 > Project: Flink > Issue Type: Improvement >Reporter: Ted Yu >Assignee: vinoyang >Priority: Major > Labels: security > > CVE's for Jackson: > CVE-2017-17485 > CVE-2018-5968 > CVE-2018-7489 > We can upgrade to 2.9.5 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567088#comment-16567088 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302752 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); + assertThat(resourceManager.workersInLaunch.size(), equalTo(0)); + verify(rmServices.schedulerDriver).stop(true); Review comment: Is the test already complete? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9861) Add end-to-end test for reworked BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-9861: - Assignee: Kostas Kloudas > Add end-to-end test for reworked BucketingSink > -- > > Key: FLINK-9861 > URL: https://issues.apache.org/jira/browse/FLINK-9861 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Kostas Kloudas >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > We should add a end-to-end test for the reworked BucketingSink to verify that > the sink works with different {{FileSystems}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567084#comment-16567084 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294014 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - CompletableFuture stopFuture = CompletableFuture.allOf( + return CompletableFuture.allOf( stopTaskMonitorFuture, stopConnectionMonitorFuture, stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture); + } + + @Override + public CompletableFuture postStop() { + final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); final CompletableFuture terminationFuture = super.postStop(); Review comment: I think we should call `super.postStop` only after `supportActorsStopFuture` has been completed. Otherwise we might risk that the parent class shuts some resources down which are used by the support actors. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (FLINK-9861) Add end-to-end test for reworked BucketingSink
[ https://issues.apache.org/jira/browse/FLINK-9861?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-9861: - Assignee: Chesnay Schepler (was: Kostas Kloudas) > Add end-to-end test for reworked BucketingSink > -- > > Key: FLINK-9861 > URL: https://issues.apache.org/jira/browse/FLINK-9861 > Project: Flink > Issue Type: Sub-task > Components: Streaming Connectors >Affects Versions: 1.6.0 >Reporter: Till Rohrmann >Assignee: Chesnay Schepler >Priority: Blocker > Labels: pull-request-available > Fix For: 1.6.0 > > > We should add a end-to-end test for the reworked BucketingSink to verify that > the sink works with different {{FileSystems}}. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567082#comment-16567082 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302992 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) + .thenRunAsync(() -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()); Review comment: Maybe we could refactor the `grantLeadership` the following way: ``` /** * Callback method when current resourceManager is granted leadership. * * @param newLeaderSessionID unique leadershipID */ @Override public void grantLeadership(final UUID newLeaderSessionID) { final CompletableFuture acceptLeadershipFuture = CompletableFuture.supplyAsync( () -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()).thenCompose(Function.identity()); final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( (Boolean acceptLeadership) -> { if (acceptLeadership) { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); } }, getRpcService().getExecutor()); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); } private CompletableFuture tryAcceptLeadership(UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL:
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567085#comment-16567085 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294205 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); Review comment: Can this line be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567081#comment-16567081 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207293251 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) Review comment: Let's move the exception handling at the very end. That way we can also catch if `confirmLeaderSessionID` fails. In all cases, we should call `onFatalError`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-9936) Mesos resource manager unable to connect to master after failover
[ https://issues.apache.org/jira/browse/FLINK-9936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567083#comment-16567083 ] ASF GitHub Bot commented on FLINK-9936: --- tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207295725 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +267,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); Review comment: Can't we clear these fields right away when `clearState` is called? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Mesos resource manager unable to connect to master after failover > - > > Key: FLINK-9936 > URL: https://issues.apache.org/jira/browse/FLINK-9936 > Project: Flink > Issue Type: Bug > Components: Mesos, Scheduler >Affects Versions: 1.5.0, 1.5.1, 1.6.0 >Reporter: Renjie Liu >Assignee: Gary Yao >Priority: Blocker > Labels: pull-request-available > Fix For: 1.5.3, 1.6.0 > > > When deployed in mesos session cluster mode, the connector monitor keeps > reporting unable to connect to mesos after restart. In fact, scheduler driver > already connected to mesos master, but when the connected message is lost. > This is because leadership is not granted yet and fence id is not set, the > rpc service ignores the connected message. So we should connect to mesos > master after leadership is granted. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207295725 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -278,22 +267,76 @@ protected void initialize() throws ResourceManagerException { catch (IOException e) { throw new ResourceManagerException("Unable to configure the artifact server with TaskManager artifacts.", e); } + } - // begin scheduling - connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); - schedulerDriver.start(); + @Override + protected CompletableFuture prepareLeadershipAsync() { + Preconditions.checkState(initializedMesosConfig != null); + + return clearStateFuture + .thenRunAsync(() -> { + schedulerDriver = initializedMesosConfig.createDriver( + new MesosResourceManagerSchedulerCallback(), + false); + + // create supporting actors + connectionMonitor = createConnectionMonitor(); + launchCoordinator = createLaunchCoordinator(schedulerDriver, selfActor); + reconciliationCoordinator = createReconciliationCoordinator(schedulerDriver); + taskMonitor = createTaskMonitor(schedulerDriver); + }, getMainThreadExecutor()) + .thenCombineAsync(getWorkersAsync(), (ignored, tasksFromPreviousAttempts) -> { + // recover state + recoverWorkers(tasksFromPreviousAttempts); + + // begin scheduling + connectionMonitor.tell(new ConnectionMonitor.Start(), selfActor); + schedulerDriver.start(); + + LOG.info("Mesos resource manager started."); + return null; + }, getMainThreadExecutor()); + } - LOG.info("Mesos resource manager initialized."); + @Override + protected void clearState() { + schedulerDriver.stop(true); + + clearStateFuture = stopSupportingActorsAsync().thenRunAsync(() -> { + workersInNew.clear(); + workersInLaunch.clear(); + workersBeingReturned.clear(); Review comment: Can't we clear these fields right away when `clearState` is called? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302752 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); + assertThat(resourceManager.workersInLaunch.size(), equalTo(0)); + verify(rmServices.schedulerDriver).stop(true); Review comment: Is the test already complete? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207302992 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) + .thenRunAsync(() -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID), getRpcService().getExecutor()); Review comment: Maybe we could refactor the `grantLeadership` the following way: ``` /** * Callback method when current resourceManager is granted leadership. * * @param newLeaderSessionID unique leadershipID */ @Override public void grantLeadership(final UUID newLeaderSessionID) { final CompletableFuture acceptLeadershipFuture = CompletableFuture.supplyAsync( () -> tryAcceptLeadership(newLeaderSessionID), getUnfencedMainThreadExecutor()).thenCompose(Function.identity()); final CompletableFuture confirmationFuture = acceptLeadershipFuture.thenAcceptAsync( (Boolean acceptLeadership) -> { if (acceptLeadership) { // confirming the leader session ID might be blocking, leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); } }, getRpcService().getExecutor()); confirmationFuture.whenComplete( (Void ignored, Throwable throwable) -> { if (throwable != null) { onFatalError(ExceptionUtils.stripCompletionException(throwable)); } }); } private CompletableFuture tryAcceptLeadership(UUID newLeaderSessionID) { if (leaderElectionService.hasLeadership(newLeaderSessionID)) { final ResourceManagerId newResourceManagerId = ResourceManagerId.fromUuid(newLeaderSessionID); log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before if (getFencingToken() != null) { clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); return prepareLeadershipAsync().thenApply(ignored -> true); } else { return CompletableFuture.completedFuture(false); } } ``` This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294014 ## File path: flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManager.java ## @@ -345,15 +382,20 @@ private void recoverWorkers() throws Exception { CompletableFuture stopReconciliationCoordinatorFuture = stopActor(reconciliationCoordinator, stopTimeout); reconciliationCoordinator = null; - CompletableFuture stopFuture = CompletableFuture.allOf( + return CompletableFuture.allOf( stopTaskMonitorFuture, stopConnectionMonitorFuture, stopLaunchCoordinatorFuture, stopReconciliationCoordinatorFuture); + } + + @Override + public CompletableFuture postStop() { + final CompletableFuture supportActorsStopFuture = stopSupportingActorsAsync(); final CompletableFuture terminationFuture = super.postStop(); Review comment: I think we should call `super.postStop` only after `supportActorsStopFuture` has been completed. Otherwise we might risk that the parent class shuts some resources down which are used by the support actors. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207294205 ## File path: flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ## @@ -807,4 +807,21 @@ public void testDisconnected() throws Exception { resourceManager.taskRouter.expectMsgClass(Disconnected.class); }}; } + + @Test + public void testClearStateRevokeLeadership() throws Exception { + new Context() {{ + MesosWorkerStore.Worker worker1 = MesosWorkerStore.Worker.newWorker(task1).launchWorker(slave1, slave1host); + when(rmServices.workerStore.getFrameworkID()).thenReturn(Option.apply(framework1)); + when(rmServices.workerStore.recoverWorkers()).thenReturn(Collections.singletonList(worker1)).thenReturn(Collections.emptyList()); + + startResourceManager(); + rmServices.rmLeaderElectionService.notLeader(); + rmServices.grantLeadership(); + + //resourceManager.stateCleared.await(5, TimeUnit.SECONDS); Review comment: Can this line be removed? This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[GitHub] tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP
tillrohrmann commented on a change in pull request #6464: [FLINK-9936][mesos] WIP URL: https://github.com/apache/flink/pull/6464#discussion_r207293251 ## File path: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ## @@ -894,17 +900,21 @@ public void grantLeadership(final UUID newLeaderSessionID) { // clear the state if we've been the leader before if (getFencingToken() != null) { - clearState(); + clearStateInternal(); } setFencingToken(newResourceManagerId); slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceActionsImpl()); - getRpcService().execute( - () -> + prepareLeadershipAsync() + .exceptionally(t -> { + onFatalError(t); + return null; + }) Review comment: Let's move the exception handling at the very end. That way we can also catch if `confirmLeaderSessionID` fails. In all cases, we should call `onFatalError`. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Closed] (FLINK-10029) Refactor the code for better separation of concerns.
[ https://issues.apache.org/jira/browse/FLINK-10029?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-10029. -- Resolution: Fixed Merged on master with 1b0baa162bd87efd69040eb787de8d6624f14c85 and on release-1.6 with d43e0f8f4d873b3e8392209853b56dd7a2c0db67 > Refactor the code for better separation of concerns. > > > Key: FLINK-10029 > URL: https://issues.apache.org/jira/browse/FLINK-10029 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Closed] (FLINK-10027) Add logging to the StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-10027. -- Resolution: Fixed Merged on master with 852502b7d51f91c6f9c3479424516d0b9ae255e5 and on release-1.6 with a72d2c6d8bb07761434dadb20569e9df850cffaa > Add logging to the StreamingFileSink > > > Key: FLINK-10027 > URL: https://issues.apache.org/jira/browse/FLINK-10027 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (FLINK-10027) Add logging to the StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567073#comment-16567073 ] ASF GitHub Bot commented on FLINK-10027: asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink URL: https://github.com/apache/flink/pull/6477 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index a350096e38b..6187e6853dd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -18,11 +18,14 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -34,13 +37,14 @@ /** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. * - * For each incoming element in the {@code BucketingSink}, the user-specified - * {@link Bucketer Bucketer} is - * queried to see in which bucket this element should be written to. + * For each incoming element in the {@code StreamingFileSink}, the user-specified + * {@link BucketAssigner} is queried to see in which bucket this element should be written to. */ -@PublicEvolving +@Internal public class Bucket { + private static final Logger LOG = LoggerFactory.getLogger(Bucket.class); + private static final String PART_PREFIX = "part"; private final BucketID bucketId; @@ -53,57 +57,27 @@ private final RecoverableWriter fsWriter; - private final Map> pendingPerCheckpoint = new HashMap<>(); - - private long partCounter; - - private PartFileWriter currentPart; + private final RollingPolicy rollingPolicy; - private List pending; - - /** -* Constructor to restore a bucket from checkpointed state. -*/ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory, - BucketState bucketState) throws IOException { + private final Map> pendingPartsPerCheckpoint = new HashMap<>(); - this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory); - - // the constructor must have already initialized the filesystem writer - Preconditions.checkState(fsWriter != null); - - // we try to resume the previous in-progress file, if the filesystem - // supports such operation. If not, we just commit the file and start fresh. + private long partCounter; - final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress(); - if (resumable != null) { - currentPart = partFileFactory.resumeFrom( - bucketId, fsWriter, resumable, bucketState.getCreationTime()); - } + private PartFileWriter inProgressPart; - // we commit pending files for previous checkpoints to the last successful one - // (from which we are recovering from) - for (List commitables: bucketState.getPendingPerCheckpoint().values()) { - for (RecoverableWriter.CommitRecoverable commitable: commitables) { - fsWriter.recoverForCommit(commitable).commitAfterRecovery(); - } - } - } + private List pendingPartsForCurrentCheckpoint; /** * Constructor to create a new empty bucket. */ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - BucketID bucketId, - Path bucketPath, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory) { + private Bucket( +
[GitHub] asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink
asfgit closed pull request #6477: [FLINK-10027] Add logging to StreamingFileSink URL: https://github.com/apache/flink/pull/6477 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java index a350096e38b..6187e6853dd 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/filesystem/Bucket.java @@ -18,11 +18,14 @@ package org.apache.flink.streaming.api.functions.sink.filesystem; -import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.annotation.Internal; import org.apache.flink.core.fs.Path; import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.util.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.ArrayList; import java.util.HashMap; @@ -34,13 +37,14 @@ /** * A bucket is the directory organization of the output of the {@link StreamingFileSink}. * - * For each incoming element in the {@code BucketingSink}, the user-specified - * {@link Bucketer Bucketer} is - * queried to see in which bucket this element should be written to. + * For each incoming element in the {@code StreamingFileSink}, the user-specified + * {@link BucketAssigner} is queried to see in which bucket this element should be written to. */ -@PublicEvolving +@Internal public class Bucket { + private static final Logger LOG = LoggerFactory.getLogger(Bucket.class); + private static final String PART_PREFIX = "part"; private final BucketID bucketId; @@ -53,57 +57,27 @@ private final RecoverableWriter fsWriter; - private final Map> pendingPerCheckpoint = new HashMap<>(); - - private long partCounter; - - private PartFileWriter currentPart; + private final RollingPolicy rollingPolicy; - private List pending; - - /** -* Constructor to restore a bucket from checkpointed state. -*/ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory, - BucketState bucketState) throws IOException { + private final Map> pendingPartsPerCheckpoint = new HashMap<>(); - this(fsWriter, subtaskIndex, bucketState.getBucketId(), bucketState.getBucketPath(), initialPartCounter, partFileFactory); - - // the constructor must have already initialized the filesystem writer - Preconditions.checkState(fsWriter != null); - - // we try to resume the previous in-progress file, if the filesystem - // supports such operation. If not, we just commit the file and start fresh. + private long partCounter; - final RecoverableWriter.ResumeRecoverable resumable = bucketState.getInProgress(); - if (resumable != null) { - currentPart = partFileFactory.resumeFrom( - bucketId, fsWriter, resumable, bucketState.getCreationTime()); - } + private PartFileWriter inProgressPart; - // we commit pending files for previous checkpoints to the last successful one - // (from which we are recovering from) - for (List commitables: bucketState.getPendingPerCheckpoint().values()) { - for (RecoverableWriter.CommitRecoverable commitable: commitables) { - fsWriter.recoverForCommit(commitable).commitAfterRecovery(); - } - } - } + private List pendingPartsForCurrentCheckpoint; /** * Constructor to create a new empty bucket. */ - public Bucket( - RecoverableWriter fsWriter, - int subtaskIndex, - BucketID bucketId, - Path bucketPath, - long initialPartCounter, - PartFileWriter.PartFileFactory partFileFactory) { + private Bucket( + final RecoverableWriter fsWriter, + final int subtaskIndex, + final BucketID bucketId, + final Path bucketPath, + final long initialPartCounter, +
[jira] [Commented] (FLINK-10010) Deprecate unused BaseAlignedWindowAssigner related components
[ https://issues.apache.org/jira/browse/FLINK-10010?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567042#comment-16567042 ] ASF GitHub Bot commented on FLINK-10010: walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components URL: https://github.com/apache/flink/pull/6471#issuecomment-409989144 thx for the review @yanghua . for some reason java checkstyle did not catch these problems. just fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Deprecate unused BaseAlignedWindowAssigner related components > - > > Key: FLINK-10010 > URL: https://issues.apache.org/jira/browse/FLINK-10010 > Project: Flink > Issue Type: Bug > Components: DataStream API >Reporter: Rong Rong >Assignee: Rong Rong >Priority: Major > Labels: pull-request-available > > {{BaseAlignedWindowAssigner}} should be marked as deprecated and > {{SlidingAlignedProcessingTimeWindows}} should be removed from the Flink Repo. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[GitHub] walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components
walterddr commented on issue #6471: [FLINK-10010][DataStream API] Deprecate unused BaseAlignedWindowAssigner related components URL: https://github.com/apache/flink/pull/6471#issuecomment-409989144 thx for the review @yanghua . for some reason java checkstyle did not catch these problems. just fixed it. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services
[jira] [Commented] (FLINK-10027) Add logging to the StreamingFileSink
[ https://issues.apache.org/jira/browse/FLINK-10027?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16567040#comment-16567040 ] ASF GitHub Bot commented on FLINK-10027: kl0u commented on issue #6477: [FLINK-10027] Add logging to StreamingFileSink URL: https://github.com/apache/flink/pull/6477#issuecomment-409988617 Thanks for the review @zentol . I will merge as soon as travis gives green. This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add logging to the StreamingFileSink > > > Key: FLINK-10027 > URL: https://issues.apache.org/jira/browse/FLINK-10027 > Project: Flink > Issue Type: Sub-task > Components: filesystem-connector >Affects Versions: 1.6.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas >Priority: Major > Labels: pull-request-available > Fix For: 1.6.0 > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)