[jira] [Commented] (FLINK-8102) Formatting issues in Mesos documentation.
[ https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257954#comment-16257954 ] Joerg Schad commented on FLINK-8102: https://github.com/apache/flink/pull/5033 > Formatting issues in Mesos documentation. > -- > > Key: FLINK-8102 > URL: https://issues.apache.org/jira/browse/FLINK-8102 > Project: Flink > Issue Type: Bug >Reporter: Joerg Schad >Assignee: Joerg Schad >Priority: Minor > > The Flink documentation renders incorrectly as some characters are not > probably escaped. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8102) Formatting issues in Mesos documentation.
[ https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257953#comment-16257953 ] ASF GitHub Bot commented on FLINK-8102: --- GitHub user joerg84 opened a pull request: https://github.com/apache/flink/pull/5033 [FLINK-8102][docs] Fixed formatting issues in Mesos documentation. This change fixes formatting issues occurring in the markdown rendering. It was tested was building the documentation locally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joerg84/flink docu Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5033.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5033 commit bb2826b085da4a4119c13a1e6babdd78fc8de6aa Author: Joerg SchadDate: 2017-11-18T06:23:02Z [FLINK-8102][docs] Fixed formatting issues in Mesos documentation. > Formatting issues in Mesos documentation. > -- > > Key: FLINK-8102 > URL: https://issues.apache.org/jira/browse/FLINK-8102 > Project: Flink > Issue Type: Bug >Reporter: Joerg Schad >Assignee: Joerg Schad >Priority: Minor > > The Flink documentation renders incorrectly as some characters are not > probably escaped. > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5033: [FLINK-8102][docs] Fixed formatting issues in Meso...
GitHub user joerg84 opened a pull request: https://github.com/apache/flink/pull/5033 [FLINK-8102][docs] Fixed formatting issues in Mesos documentation. This change fixes formatting issues occurring in the markdown rendering. It was tested was building the documentation locally. You can merge this pull request into a Git repository by running: $ git pull https://github.com/joerg84/flink docu Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5033.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5033 commit bb2826b085da4a4119c13a1e6babdd78fc8de6aa Author: Joerg SchadDate: 2017-11-18T06:23:02Z [FLINK-8102][docs] Fixed formatting issues in Mesos documentation. ---
[jira] [Created] (FLINK-8102) Formatting issues in Mesos documentation.
Joerg Schad created FLINK-8102: -- Summary: Formatting issues in Mesos documentation. Key: FLINK-8102 URL: https://issues.apache.org/jira/browse/FLINK-8102 Project: Flink Issue Type: Bug Reporter: Joerg Schad Assignee: Joerg Schad Priority: Minor The Flink documentation renders incorrectly as some characters are not probably escaped. https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8101) Elasticsearch 6.x support
Hai Zhou UTC+8 created FLINK-8101: - Summary: Elasticsearch 6.x support Key: FLINK-8101 URL: https://issues.apache.org/jira/browse/FLINK-8101 Project: Flink Issue Type: New Feature Components: ElasticSearch Connector Affects Versions: 1.4.0 Reporter: Hai Zhou UTC+8 Fix For: 1.5.0 Recently, elasticsearch 6.0.0 was released: https://www.elastic.co/blog/elasticsearch-6-0-0-released The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-6222) YARN: setting environment variables in an easier fashion
[ https://issues.apache.org/jira/browse/FLINK-6222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Craig Foster updated FLINK-6222: Attachment: patch0-add-yarn-hadoop-conf.diff > YARN: setting environment variables in an easier fashion > > > Key: FLINK-6222 > URL: https://issues.apache.org/jira/browse/FLINK-6222 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.2.0 > Environment: YARN, EMR >Reporter: Craig Foster > Attachments: patch0-add-yarn-hadoop-conf.diff > > > Right now we require end-users to set YARN_CONF_DIR or HADOOP_CONF_DIR and > sometimes FLINK_CONF_DIR. > For example, in [1], it is stated: > “Please note that the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR > environment variable to be set to read the YARN and HDFS configuration.” > In BigTop, we set this with /etc/flink/default and then a wrapper is created > to source that. However, this is slightly cumbersome and we don't have a > central place within the Flink project itself to source environment > variables. config.sh could do this but it doesn't have information about > FLINK_CONF_DIR. For YARN and Hadoop variables, I already have a solution that > would add "env.yarn.confdir" and "env.hadoop.confdir" variables to the > flink-conf.yaml file and then we just symlink /etc/lib/flink/conf/ and > /etc/flink/conf. > But we could also add a flink-env.sh file to set these variables and decouple > them from config.sh entirely. > I'd like to know the opinion/preference of others and what would be more > amenable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6222) YARN: setting environment variables in an easier fashion
[ https://issues.apache.org/jira/browse/FLINK-6222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257610#comment-16257610 ] Craig Foster commented on FLINK-6222: - I'm submitting a patch here to see if we can move this along. > YARN: setting environment variables in an easier fashion > > > Key: FLINK-6222 > URL: https://issues.apache.org/jira/browse/FLINK-6222 > Project: Flink > Issue Type: Improvement > Components: Startup Shell Scripts >Affects Versions: 1.2.0 > Environment: YARN, EMR >Reporter: Craig Foster > > Right now we require end-users to set YARN_CONF_DIR or HADOOP_CONF_DIR and > sometimes FLINK_CONF_DIR. > For example, in [1], it is stated: > “Please note that the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR > environment variable to be set to read the YARN and HDFS configuration.” > In BigTop, we set this with /etc/flink/default and then a wrapper is created > to source that. However, this is slightly cumbersome and we don't have a > central place within the Flink project itself to source environment > variables. config.sh could do this but it doesn't have information about > FLINK_CONF_DIR. For YARN and Hadoop variables, I already have a solution that > would add "env.yarn.confdir" and "env.hadoop.confdir" variables to the > flink-conf.yaml file and then we just symlink /etc/lib/flink/conf/ and > /etc/flink/conf. > But we could also add a flink-env.sh file to set these variables and decouple > them from config.sh entirely. > I'd like to know the opinion/preference of others and what would be more > amenable. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 Yeah, But I think 1.4 will take some more couple of weeks. As unable to restore the state, So it's necessary to change for me. ---
[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer
[ https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257372#comment-16257372 ] ASF GitHub Bot commented on FLINK-7835: --- Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 Yeah, But I think 1.4 will take some more couple of weeks. As unable to restore the state, So it's necessary to change for me. > Fix duplicate() method in NFASerializer > --- > > Key: FLINK-7835 > URL: https://issues.apache.org/jira/browse/FLINK-7835 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0, 1.3.1, 1.3.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8100) Consider introducing log4j-extras
Ted Yu created FLINK-8100: - Summary: Consider introducing log4j-extras Key: FLINK-8100 URL: https://issues.apache.org/jira/browse/FLINK-8100 Project: Flink Issue Type: Improvement Reporter: Ted Yu log4j-extras allows log rotation as well as compression. https://logging.apache.org/log4j/extras/download.html We should consider using log4j-extras. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257329#comment-16257329 ] ASF GitHub Bot commented on FLINK-8090: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5032 Can you please add a unit test for this? > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/5032 Can you please add a unit test for this? ---
[jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner
[ https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257198#comment-16257198 ] ASF GitHub Bot commented on FLINK-6936: --- Github user xccui closed the pull request at: https://github.com/apache/flink/pull/4297 > Add multiple targets support for custom partitioner > --- > > Key: FLINK-6936 > URL: https://issues.apache.org/jira/browse/FLINK-6936 > Project: Flink > Issue Type: Improvement > Components: DataStream API >Reporter: Xingcan Cui >Assignee: Xingcan Cui >Priority: Minor > > The current user-facing Partitioner only allows returning one target. > {code:java} > @Public > public interface Partitioner extends java.io.Serializable, Function { > /** >* Computes the partition for the given key. >* >* @param key The key. >* @param numPartitions The number of partitions to partition into. >* @return The partition index. >*/ > int partition(K key, int numPartitions); > } > {code} > Actually, this function should return multiple partitions and this may be a > historical legacy. > There could be at least three approaches to solve this. > # Make the `protected DataStream setConnectionType(StreamPartitioner > partitioner)` method in DataStream public and that allows users to directly > define StreamPartitioner. > # Change the `partition` method in the Partitioner interface to return an int > array instead of a single int value. > # Add a new `multicast` method to DataStream and provide a MultiPartitioner > interface which returns an int array. > Considering the consistency of API, the 3rd approach seems to be an > acceptable choice. [~aljoscha], what do you think? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4297: [FLINK-6936] [streaming] Add multiple targets supp...
Github user xccui closed the pull request at: https://github.com/apache/flink/pull/4297 ---
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257193#comment-16257193 ] ASF GitHub Bot commented on FLINK-8090: --- GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5032 [FLINK-8090] [DataStream] Improve the error message for duplicate state name ## What is the purpose of the change This PR improves the error message when users trying to access two states of different types, but with an identical name. However, it cannot detect two states of the same type and name, but are registered via two descriptors. ## Brief change log Refactor `DefaultKeyedStateStore.java` to raise an error message when the required state type does not match the real state type. ## Verifying this change The change can be verified by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8090 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5032 commit 78acc38ff0a1527eeb8204a74b65765bc673a6b3 Author: Xingcan CuiDate: 2017-11-17T12:13:49Z [FLINK-8090] [DataStream] Improve error message when registering different states under the same name > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor > > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at >
[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...
GitHub user xccui opened a pull request: https://github.com/apache/flink/pull/5032 [FLINK-8090] [DataStream] Improve the error message for duplicate state name ## What is the purpose of the change This PR improves the error message when users trying to access two states of different types, but with an identical name. However, it cannot detect two states of the same type and name, but are registered via two descriptors. ## Brief change log Refactor `DefaultKeyedStateStore.java` to raise an error message when the required state type does not match the real state type. ## Verifying this change The change can be verified by existing tests. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/xccui/flink FLINK-8090 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5032.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5032 commit 78acc38ff0a1527eeb8204a74b65765bc673a6b3 Author: Xingcan CuiDate: 2017-11-17T12:13:49Z [FLINK-8090] [DataStream] Improve error message when registering different states under the same name ---
[jira] [Commented] (FLINK-8083) FileSystem class not binary compatible with 1.3
[ https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257180#comment-16257180 ] Aljoscha Krettek commented on FLINK-8083: - [~Zentol] I think you can now merge that change to the japicmp settings. > FileSystem class not binary compatible with 1.3 > --- > > Key: FLINK-8083 > URL: https://issues.apache.org/jira/browse/FLINK-8083 > Project: Flink > Issue Type: Bug > Components: Core, FileSystem >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The {{FileSystem}} class it not binary compatible with 1.3 due to a removed > method. > {code} > REMOVED (!) public abstract org.apache.flink.core.fs.FileSystemKind > getKind() > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7266. --- Resolution: Fixed Fixed on release-1.4 in 666b1b2e62463ae4985d237535d56c9e0ab9dba9 > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8083) FileSystem class not binary compatible with 1.3
[ https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-8083. --- Resolution: Fixed This should be fixed now that FLINK-7265 is on master and release-1.4. > FileSystem class not binary compatible with 1.3 > --- > > Key: FLINK-8083 > URL: https://issues.apache.org/jira/browse/FLINK-8083 > Project: Flink > Issue Type: Bug > Components: Core, FileSystem >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The {{FileSystem}} class it not binary compatible with 1.3 due to a removed > method. > {code} > REMOVED (!) public abstract org.apache.flink.core.fs.FileSystemKind > getKind() > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek closed FLINK-7265. --- Resolution: Fixed Fixed on release-1.4 in a0dbe182fa677a87f601cbedc4115e63fff9fe4f > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8099) Reduce default restart delay to 1 second
[ https://issues.apache.org/jira/browse/FLINK-8099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257176#comment-16257176 ] ASF GitHub Bot commented on FLINK-8099: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5031 [FLINK-8099] Reduce default restart delay to 1 second R: @tillrohrmann We could also introduce an extra entry in `ConfigConstants` for the `"1 s"` default but I opted against that. What do you think? You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8099-1s-delay-restart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5031.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5031 commit 6d1d9b1caabfcd731ec2b2ba952fc5a0b27e98e7 Author: Aljoscha KrettekDate: 2017-11-17T16:19:51Z [FLINK-8099] Reduce default restart delay to 1 second > Reduce default restart delay to 1 second > > > Key: FLINK-8099 > URL: https://issues.apache.org/jira/browse/FLINK-8099 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, when a job fails Flink will wait for 10 seconds until restarting > the job. Even zero delay is a reasonable setting but will result in > "flooding" the logs and quickly increasing the restart counter because at > zero delay you will always see failures when no standby resources are > available. > Reducing this to 1 second should make for a nicer out-of-box experience and > not flood too much. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5031: [FLINK-8099] Reduce default restart delay to 1 sec...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/5031 [FLINK-8099] Reduce default restart delay to 1 second R: @tillrohrmann We could also introduce an extra entry in `ConfigConstants` for the `"1 s"` default but I opted against that. What do you think? You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-8099-1s-delay-restart Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5031.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5031 commit 6d1d9b1caabfcd731ec2b2ba952fc5a0b27e98e7 Author: Aljoscha KrettekDate: 2017-11-17T16:19:51Z [FLINK-8099] Reduce default restart delay to 1 second ---
[jira] [Updated] (FLINK-8099) Reduce default restart delay to 1 second
[ https://issues.apache.org/jira/browse/FLINK-8099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-8099: Issue Type: Improvement (was: Bug) > Reduce default restart delay to 1 second > > > Key: FLINK-8099 > URL: https://issues.apache.org/jira/browse/FLINK-8099 > Project: Flink > Issue Type: Improvement > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > Currently, when a job fails Flink will wait for 10 seconds until restarting > the job. Even zero delay is a reasonable setting but will result in > "flooding" the logs and quickly increasing the restart counter because at > zero delay you will always see failures when no standby resources are > available. > Reducing this to 1 second should make for a nicer out-of-box experience and > not flood too much. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-8099) Reduce default restart delay to 1 second
Aljoscha Krettek created FLINK-8099: --- Summary: Reduce default restart delay to 1 second Key: FLINK-8099 URL: https://issues.apache.org/jira/browse/FLINK-8099 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Reporter: Aljoscha Krettek Assignee: Aljoscha Krettek Priority: Blocker Fix For: 1.4.0 Currently, when a job fails Flink will wait for 10 seconds until restarting the job. Even zero delay is a reasonable setting but will result in "flooding" the logs and quickly increasing the restart counter because at zero delay you will always see failures when no standby resources are available. Reducing this to 1 second should make for a nicer out-of-box experience and not flood too much. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257147#comment-16257147 ] ASF GitHub Bot commented on FLINK-7265: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4397 > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257146#comment-16257146 ] Aljoscha Krettek commented on FLINK-7266: - Fixed on master (to be 1.5) in b00f1b326c1ab4221a555200a4d5798e1565b821 > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4397: [FLINK-7265] [FLINK-7266] Introduce FileSystemKind...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4397 ---
[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257144#comment-16257144 ] ASF GitHub Bot commented on FLINK-7265: --- Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4397 @StephanEwen Could you please close this PR now that it's also merged for master/1.4? > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257145#comment-16257145 ] Aljoscha Krettek commented on FLINK-7265: - Added on master (to be 1.5) in f29f80575dac1c7e59dd7074118953b8be26520f > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4397: [FLINK-7265] [FLINK-7266] Introduce FileSystemKind and Co...
Github user aljoscha commented on the issue: https://github.com/apache/flink/pull/4397 @StephanEwen Could you please close this PR now that it's also merged for master/1.4? ---
[jira] [Assigned] (FLINK-7841) Add docs for Flink's S3 support
[ https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Nico Kruber reassigned FLINK-7841: -- Assignee: Nico Kruber (was: Stephan Ewen) > Add docs for Flink's S3 support > --- > > Key: FLINK-7841 > URL: https://issues.apache.org/jira/browse/FLINK-7841 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Stephan Ewen >Assignee: Nico Kruber > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3
[ https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257102#comment-16257102 ] ASF GitHub Bot commented on FLINK-7988: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4950 Changes look good to me. Thanks for your contribution @NicoK. Merging this PR. > HadoopS3FileSystemITCase leaves test directories behind in S3 > - > > Key: FLINK-7988 > URL: https://issues.apache.org/jira/browse/FLINK-7988 > Project: Flink > Issue Type: Bug > Components: filesystem-connector, Tests >Affects Versions: 1.4.0 >Reporter: Nico Kruber >Assignee: Nico Kruber > > {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 > which is not cleaned up. Please note, that the individual tests create > sub-directories and also always delete those at least. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4950: [FLINK-7988][s3] fix HadoopS3FileSystemITCase leaving tes...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4950 Changes look good to me. Thanks for your contribution @NicoK. Merging this PR. ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257100#comment-16257100 ] ASF GitHub Bot commented on FLINK-4228: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4939 Alright, then we'll do it as you've proposed. I'll merge this PR. Thanks for your work @NicoK and the review @zentol and @StephanEwen. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4939 Alright, then we'll do it as you've proposed. I'll merge this PR. Thanks for your work @NicoK and the review @zentol and @StephanEwen. ---
[jira] [Closed] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs
[ https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8061. - Resolution: Fixed Merged with 3edbb7bce5b30386a67b1b01ef1591a681601219. > Remove trailing asterisk in QueryableStateClient javadocs > - > > Key: FLINK-8061 > URL: https://issues.apache.org/jira/browse/FLINK-8061 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeHint A {@link TypeHint} used > to extract the type of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeInfo The {@link > TypeInformation} of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs
[ https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257042#comment-16257042 ] ASF GitHub Bot commented on FLINK-8061: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5008 @vetriselvan1187 Merged. Can you close this PR? > Remove trailing asterisk in QueryableStateClient javadocs > - > > Key: FLINK-8061 > URL: https://issues.apache.org/jira/browse/FLINK-8061 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeHint A {@link TypeHint} used > to extract the type of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeInfo The {@link > TypeInformation} of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5008: [FLINK-8061] [Queryable State] removed trailing asterisk ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5008 @vetriselvan1187 Merged. Can you close this PR? ---
[GitHub] flink issue #5023: [hotfix][docs] Review of concepts docs for grammar, forma...
Github user greghogan commented on the issue: https://github.com/apache/flink/pull/5023 @ChrisChinchilla I think you want to `git rebase origin/master` (or whatever you have named upstream). Did you do a merge instead? ---
[jira] [Updated] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7266: Fix Version/s: (was: 1.5.0) > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7265) FileSystems should describe their kind and consistency level
[ https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7265: Fix Version/s: (was: 1.5.0) 1.4.0 > FileSystems should describe their kind and consistency level > > > Key: FLINK-7265 > URL: https://issues.apache.org/jira/browse/FLINK-7265 > Project: Flink > Issue Type: Improvement > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0, 1.3.3 > > > Currently, all {{FileSystem}} types look the same to Flink. > However, certain operations should only be executed on certain kinds of file > systems. > For example, it makes no sense to attempt to delete an empty parent directory > on S3, because there are no such thinks as directories, only hierarchical > naming in the keys (file names). -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8083) FileSystem class not binary compatible with 1.3
[ https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-8083: --- Assignee: Aljoscha Krettek > FileSystem class not binary compatible with 1.3 > --- > > Key: FLINK-8083 > URL: https://issues.apache.org/jira/browse/FLINK-8083 > Project: Flink > Issue Type: Bug > Components: Core, FileSystem >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0 > > > The {{FileSystem}} class it not binary compatible with 1.3 due to a removed > method. > {code} > REMOVED (!) public abstract org.apache.flink.core.fs.FileSystemKind > getKind() > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8086) FlinkKafkaProducer011 can permanently fail in recovery through ProducerFencedException
[ https://issues.apache.org/jira/browse/FLINK-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256977#comment-16256977 ] ASF GitHub Bot commented on FLINK-8086: --- GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5030 [FLINK-8086][kafka] Ignore ProducerFencedException ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. ## Verifying this change This change added a `testFailAndRecoverSameCheckpointTwice` for a scenario that was previously not tested (and was failing). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8086 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5030.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5030 commit d4542339e39adeff69cf18a647c8a5ac0d97969e Author: Piotr NowojskiDate: 2017-11-17T13:31:07Z [hotfix][kafka] Improve logging in FlinkKafkaProducer011 commit 642ba6d0c3e204824f1ff69412d70918be9e3ac7 Author: Piotr Nowojski Date: 2017-11-17T13:40:30Z [FLINK-8086][kafka] Ignore ProducerFencedException during recovery ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. > FlinkKafkaProducer011 can permanently fail in recovery through > ProducerFencedException > -- > > Key: FLINK-8086 > URL: https://issues.apache.org/jira/browse/FLINK-8086 > Project: Flink > Issue Type: Bug > Components: Kafka Connector >Affects Versions: 1.4.0, 1.5.0 >Reporter: Stefan Richter >Assignee: Piotr Nowojski >Priority: Blocker > Fix For: 1.4.0 > > > Chaos monkey test in a cluster environment can permanently bring down our > FlinkKafkaProducer011. > Typically, after a small number of randomly killed TMs, the data generator > job is no longer able to recover from a checkpoint because of the following > problem: > org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an > operation with an old epoch. Either there is a newer producer with the same > transactionalId, or the producer's transaction has been expired by the broker. > The problem is reproduceable and happened for me in every run after the chaos > monkey killed a couple of TMs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5030: [FLINK-8086][kafka] Ignore ProducerFencedException
GitHub user pnowojski opened a pull request: https://github.com/apache/flink/pull/5030 [FLINK-8086][kafka] Ignore ProducerFencedException ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. ## Verifying this change This change added a `testFailAndRecoverSameCheckpointTwice` for a scenario that was previously not tested (and was failing). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/pnowojski/flink f8086 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5030.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5030 commit d4542339e39adeff69cf18a647c8a5ac0d97969e Author: Piotr NowojskiDate: 2017-11-17T13:31:07Z [hotfix][kafka] Improve logging in FlinkKafkaProducer011 commit 642ba6d0c3e204824f1ff69412d70918be9e3ac7 Author: Piotr Nowojski Date: 2017-11-17T13:40:30Z [FLINK-8086][kafka] Ignore ProducerFencedException during recovery ProducerFencedException can happen if we restore twice from the same checkpoint or if we restore from an old savepoint. In both cases transactional.ids that we want to recoverAndCommit have been already committed and reused. Reusing mean that they will be known by Kafka's brokers under newer producerId/epochId, which will result in ProducerFencedException if we try to commit again some old (and already committed) transaction. Ignoring this exception might hide some bugs/issues, because instead of failing we might have a semi silent (with a warning) data loss. ---
[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs
[ https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256958#comment-16256958 ] ASF GitHub Bot commented on FLINK-8061: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5008 Thanks for the work @vetriselvan1187 . I will merge this. > Remove trailing asterisk in QueryableStateClient javadocs > - > > Key: FLINK-8061 > URL: https://issues.apache.org/jira/browse/FLINK-8061 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeHint A {@link TypeHint} used > to extract the type of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeInfo The {@link > TypeInformation} of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5008: [FLINK-8061] [Queryable State] removed trailing asterisk ...
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/5008 Thanks for the work @vetriselvan1187 . I will merge this. ---
[jira] [Commented] (FLINK-7841) Add docs for Flink's S3 support
[ https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256951#comment-16256951 ] ASF GitHub Bot commented on FLINK-7841: --- GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5029 [FLINK-7841][docs] update AWS docs with respect to S3 file system changes ## What is the purpose of the change This updates the S3 file system configuration section in the AWS documentation which has become a lot simpler since we offer the shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` wrappers. (please also merge to the 1.4 branch since this should be in the docs accompanying it) ## Brief change log - update and extend the S3 documentation ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **yes** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **docs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7841 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5029 commit 33da387f8f4c1be75941b6e24eccf9f1a709f2ba Author: Nico KruberDate: 2017-11-17T13:13:30Z [FLINK-7841][docs] update AWS docs with respect to S3 file system changes > Add docs for Flink's S3 support > --- > > Key: FLINK-7841 > URL: https://issues.apache.org/jira/browse/FLINK-7841 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Stephan Ewen >Assignee: Stephan Ewen > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer
[ https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256950#comment-16256950 ] ASF GitHub Bot commented on FLINK-7835: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4821 Yes, I think manually is the safest bet. Either way the 1.4 will come soon and this fix is going to be included. > Fix duplicate() method in NFASerializer > --- > > Key: FLINK-7835 > URL: https://issues.apache.org/jira/browse/FLINK-7835 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0, 1.3.1, 1.3.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5029: [FLINK-7841][docs] update AWS docs with respect to...
GitHub user NicoK opened a pull request: https://github.com/apache/flink/pull/5029 [FLINK-7841][docs] update AWS docs with respect to S3 file system changes ## What is the purpose of the change This updates the S3 file system configuration section in the AWS documentation which has become a lot simpler since we offer the shaded `flink-s3-fs-hadoop` and `flink-s3-fs-presto` wrappers. (please also merge to the 1.4 branch since this should be in the docs accompanying it) ## Brief change log - update and extend the S3 documentation ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **no** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **no** - The serializers: **no** - The runtime per-record code paths (performance sensitive): **no** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: **no** - The S3 file system connector: **yes** ## Documentation - Does this pull request introduce a new feature? **no** - If yes, how is the feature documented? **docs** You can merge this pull request into a Git repository by running: $ git pull https://github.com/NicoK/flink flink-7841 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5029.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5029 commit 33da387f8f4c1be75941b6e24eccf9f1a709f2ba Author: Nico KruberDate: 2017-11-17T13:13:30Z [FLINK-7841][docs] update AWS docs with respect to S3 file system changes ---
[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4821 Yes, I think manually is the safest bet. Either way the 1.4 will come soon and this fix is going to be included. ---
[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 While applying the patch it's giving a lot of conflicts. I think due to changes in between. So why I had doubts. I'll apply these changes manually. ---
[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer
[ https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256934#comment-16256934 ] ASF GitHub Bot commented on FLINK-7835: --- Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 While applying the patch it's giving a lot of conflicts. I think due to changes in between. So why I had doubts. I'll apply these changes manually. > Fix duplicate() method in NFASerializer > --- > > Key: FLINK-7835 > URL: https://issues.apache.org/jira/browse/FLINK-7835 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0, 1.3.1, 1.3.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer
[ https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256927#comment-16256927 ] ASF GitHub Bot commented on FLINK-7835: --- Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4821 I think you can but you will have to recompile flink. > Fix duplicate() method in NFASerializer > --- > > Key: FLINK-7835 > URL: https://issues.apache.org/jira/browse/FLINK-7835 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0, 1.3.1, 1.3.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Github user kl0u commented on the issue: https://github.com/apache/flink/pull/4821 I think you can but you will have to recompile flink. ---
[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256920#comment-16256920 ] Aljoscha Krettek commented on FLINK-7266: - We actually have to fix this for 1.4 because we regress from 1.3, otherwise. The operation that deletes parent directories is too expensive and will basically DDOS s3, making Flink unusable for bigger installations with s3. > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0, 1.3.2, 1.5.0 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek reassigned FLINK-7266: --- Assignee: Aljoscha Krettek (was: Stephan Ewen) > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Aljoscha Krettek >Priority: Blocker > Fix For: 1.4.0, 1.3.2, 1.5.0 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7266) Don't attempt to delete parent directory on S3
[ https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7266: Fix Version/s: 1.4.0 > Don't attempt to delete parent directory on S3 > -- > > Key: FLINK-7266 > URL: https://issues.apache.org/jira/browse/FLINK-7266 > Project: Flink > Issue Type: Bug > Components: Core >Affects Versions: 1.3.1 >Reporter: Stephan Ewen >Assignee: Stephan Ewen >Priority: Blocker > Fix For: 1.4.0, 1.3.2, 1.5.0 > > > Currently, every attempted release of an S3 state object also checks if the > "parent directory" is empty and then tries to delete it. > Not only is that unnecessary on S3, but it is prohibitively expensive and for > example causes S3 to throttle calls by the JobManager on checkpoint cleanup. > The {{FileState}} must only attempt parent directory cleanup when operating > against real file systems, not when operating against object stores. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer
[ https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256910#comment-16256910 ] ASF GitHub Bot commented on FLINK-7835: --- Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 Can I directly use these changes in the release-1.3.2 tag or there in between dependencies? > Fix duplicate() method in NFASerializer > --- > > Key: FLINK-7835 > URL: https://issues.apache.org/jira/browse/FLINK-7835 > Project: Flink > Issue Type: Sub-task > Components: CEP >Affects Versions: 1.3.0, 1.3.1, 1.3.2 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.
Github user shashank734 commented on the issue: https://github.com/apache/flink/pull/4821 Can I directly use these changes in the release-1.3.2 tag or there in between dependencies? ---
[jira] [Updated] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
[ https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Shashank Agarwal updated FLINK-8098: Fix Version/s: 1.4.0 > LeaseExpiredException when using FsStateBackend for checkpointing due to > multiple mappers tries to access the same file. > > > Key: FLINK-8098 > URL: https://issues.apache.org/jira/browse/FLINK-8098 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Affects Versions: 1.3.2 > Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP >Reporter: Shashank Agarwal > Fix For: 1.4.0 > > > I am running streaming application with parallelism 6. I have enabled > checkpointing(1000). But application gets the crash after 1-2 days. After > analysing logs i found following trace. > {code} > 2017-11-17 11:19:06,696 WARN > org.apache.flink.streaming.runtime.tasks.StreamTask - Could not > properly clean up the async checkpoint runnable. > java.lang.Exception: Could not properly cancel managed keyed state future. > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) > at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) > at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) > at > org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) > at > org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.util.concurrent.ExecutionException: java.io.IOException: > Could not flush and close the file system output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at java.util.concurrent.FutureTask.report(FutureTask.java:122) > at java.util.concurrent.FutureTask.get(FutureTask.java:192) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) > at > org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) > at > org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) > ... 8 more > Caused by: java.io.IOException: Could not flush and close the file system > output stream to > hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > in order to obtain the stream state handle > at > org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) > at > org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) > at > org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) > at > org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > ... 1 more > Caused by: > org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): > No lease > flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 > (inode 812148671): File does not exist. [Lease. Holder: > DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] > at > org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659) > at >
[jira] [Created] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.
Shashank Agarwal created FLINK-8098: --- Summary: LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file. Key: FLINK-8098 URL: https://issues.apache.org/jira/browse/FLINK-8098 Project: Flink Issue Type: Bug Components: State Backends, Checkpointing Affects Versions: 1.3.2 Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP Reporter: Shashank Agarwal I am running streaming application with parallelism 6. I have enabled checkpointing(1000). But application gets the crash after 1-2 days. After analysing logs i found following trace. {code} 2017-11-17 11:19:06,696 WARN org.apache.flink.streaming.runtime.tasks.StreamTask - Could not properly clean up the async checkpoint runnable. java.lang.Exception: Could not properly cancel managed keyed state future. at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983) at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262) at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251) at org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355) at org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463) at java.lang.Thread.run(Thread.java:745) Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could not flush and close the file system output stream to hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 in order to obtain the stream state handle at java.util.concurrent.FutureTask.report(FutureTask.java:122) at java.util.concurrent.FutureTask.get(FutureTask.java:192) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43) at org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85) at org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88) ... 8 more Caused by: java.io.IOException: Could not flush and close the file system output stream to hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 in order to obtain the stream state handle at org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336) at org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351) at org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329) at org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40) at org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ... 1 more Caused by: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException): No lease flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965 (inode 812148671): File does not exist. [Lease. Holder: DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161] at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3749) at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3716) at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:911) at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:547)
[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs
[ https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256846#comment-16256846 ] Kostas Kloudas commented on FLINK-8061: --- Would you like to merge this [~Zentol]? It would be nice to clean up JIRA a bit. > Remove trailing asterisk in QueryableStateClient javadocs > - > > Key: FLINK-8061 > URL: https://issues.apache.org/jira/browse/FLINK-8061 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeHint A {@link TypeHint} used > to extract the type of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} > {code} > /** >* Returns a future holding the request result. * >* @param jobId JobID of the job the queryable > state belongs to. >* @param queryableStateNameName under which the state is > queryable. >* @param key The key we are interested > in. >* @param keyTypeInfo The {@link > TypeInformation} of the key. >* @param stateDescriptor The {@link > StateDescriptor} of the state we want to query. >* @return Future holding the immutable {@link State} object containing > the result. >*/ > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8064) Extend dependency section to list flink-core
[ https://issues.apache.org/jira/browse/FLINK-8064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256845#comment-16256845 ] Kostas Kloudas commented on FLINK-8064: --- Actually we tried it, and if you want to avoid transitively adding a dep to {{runtime}} in the client, there is not much that you put in the {{common}} package, and it is not worth the complexity. In any case, in the future we can introduce further separation of the packages if we want, and as long as the {{runtime}} and {{client}} stay there, it will make no difference to the user. > Extend dependency section to list flink-core > > > Key: FLINK-8064 > URL: https://issues.apache.org/jira/browse/FLINK-8064 > Project: Flink > Issue Type: Improvement > Components: Documentation, Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > > The dependency section of the Queryable State documentation should also list > flink-core as it is inherently required when working with the client. (Which > has flink-core set to provided) -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.
[ https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256826#comment-16256826 ] Xingcan Cui commented on FLINK-8090: Hi [~kkl0u], thanks for raising this. I found that a {{State}} is only decided by the name in its {{StateDescriptor}}. In other words, if we create two descriptors with an identical name and type, it will return the same state object (though the wrapper may be different). On the other hand, for two descriptors with an identical name but different types, we can detect them with a {{ClassCastException}}, which is caused by the type erasure in Java. I'll try to refactor the {{DefaultKeyedStateStore}} to provide a better error message for the later case. Best, Xingcan > Improve error message when registering different states under the same name. > > > Key: FLINK-8090 > URL: https://issues.apache.org/jira/browse/FLINK-8090 > Project: Flink > Issue Type: Bug > Components: DataStream API >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Xingcan Cui > > Currently a {{ProcessFunction}} like this: > {code} > final MapStateDescriptor> > firstMapStateDescriptor = new MapStateDescriptor<>( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO, > source.getType()); > final ListStateDescriptor secondListStateDescriptor = new > ListStateDescriptor( > "timon-one", > BasicTypeInfo.INT_TYPE_INFO); > new ProcessFunction , Object>() { > private static final long serialVersionUID = > -805125545438296619L; > private transient MapState Tuple2 > firstMapState; > private transient ListState > secondListState; > @Override > public void open(Configuration parameters) > throws Exception { > super.open(parameters); > firstMapState = > getRuntimeContext().getMapState(firstMapStateDescriptor); > secondListState = > getRuntimeContext().getListState(secondListStateDescriptor); > } > @Override > public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception { > Tuple2 v = > firstMapState.get(value.f0); > if (v == null) { > v = new Tuple2<>(value.f0, 0L); > } > firstMapState.put(value.f0, new > Tuple2<>(v.f0, v.f1 + value.f1)); > } > } > {code} > fails with: > {code} > java.lang.RuntimeException: Error while getting state > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74) > at > org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127) > at > org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327) > at > org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36) > at > org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102) > at > org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381) > at > org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253) > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.lang.ClassCastException: > org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to > org.apache.flink.api.common.state.ListState > at > org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71) > ... 9 more > {code} > Which is cryptic, as it does not explain the reason for the problem. The > error message should be something along the line of "Duplicate state name". -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7432) Unclosed HighAvailabilityServices instance in QueryableStateClient
[ https://issues.apache.org/jira/browse/FLINK-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-7432. - Resolution: Fixed This is outdated given the recent changes in Queryable State for Flink 1.4 > Unclosed HighAvailabilityServices instance in QueryableStateClient > -- > > Key: FLINK-7432 > URL: https://issues.apache.org/jira/browse/FLINK-7432 > Project: Flink > Issue Type: Bug > Components: Queryable State >Reporter: Ted Yu >Assignee: Kostas Kloudas >Priority: Minor > > {code} > public QueryableStateClient(Configuration config) throws Exception { > this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( > config, Executors.directExecutor(), > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); > {code} > The HighAvailabilityServices instance is only used for calling > getJobManagerLeaderRetriever(). > The instance should be closed upon leaving QueryableStateClient ctor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7432) Unclosed HighAvailabilityServices instance in QueryableStateClient
[ https://issues.apache.org/jira/browse/FLINK-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-7432: - Assignee: Kostas Kloudas > Unclosed HighAvailabilityServices instance in QueryableStateClient > -- > > Key: FLINK-7432 > URL: https://issues.apache.org/jira/browse/FLINK-7432 > Project: Flink > Issue Type: Bug > Components: Queryable State >Reporter: Ted Yu >Assignee: Kostas Kloudas >Priority: Minor > > {code} > public QueryableStateClient(Configuration config) throws Exception { > this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices( > config, Executors.directExecutor(), > HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION)); > {code} > The HighAvailabilityServices instance is only used for calling > getJobManagerLeaderRetriever(). > The instance should be closed upon leaving QueryableStateClient ctor. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-6676) API Migration guide: add QueryableStateClient changes
[ https://issues.apache.org/jira/browse/FLINK-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-6676. - Resolution: Fixed Assignee: Kostas Kloudas This is now outdated. > API Migration guide: add QueryableStateClient changes > - > > Key: FLINK-6676 > URL: https://issues.apache.org/jira/browse/FLINK-6676 > Project: Flink > Issue Type: Sub-task > Components: Documentation, Queryable State >Affects Versions: 1.3.0 >Reporter: Nico Kruber >Assignee: Kostas Kloudas > Fix For: 1.3.0 > > > The migration guide at {{docs/dev/migration.md}} needs to be extended with > some notes about the API changes: > * changes in the constructor > * more? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-5526) QueryableState: notify upon receiving a query but having queryable state disabled
[ https://issues.apache.org/jira/browse/FLINK-5526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-5526. - Resolution: Invalid With the current changes in Flink 1.4, this issue is outdated. > QueryableState: notify upon receiving a query but having queryable state > disabled > - > > Key: FLINK-5526 > URL: https://issues.apache.org/jira/browse/FLINK-5526 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.2.0 >Reporter: Nico Kruber >Priority: Minor > > When querying state but having it disabled in the config, a warning should be > presented to the user that a query was received but the component is > disabled. This is in addition to the query itself failing with a rather > generic exception that is not pointing to this fact. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8055) Deduplicate logging messages about QS start
[ https://issues.apache.org/jira/browse/FLINK-8055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8055. - Resolution: Fixed Merged with 5e059e968633c4292734ebed209fa1b3c30529a1 > Deduplicate logging messages about QS start > --- > > Key: FLINK-8055 > URL: https://issues.apache.org/jira/browse/FLINK-8055 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > When starting the queryable state servers the following is printed in the > logs both the NetworkEnvironment and AbstractServerBase are logging events > about it. > I suggest to remove the logging message from the NetworkEnvironment. > Furthermore i think it would be good to use a non-static logger in > AbstractServerBase to better differentiate between classes. > {code} > 2017-11-13 13:51:32,042 INFO > org.apache.flink.queryablestate.network.AbstractServerBase- Started the > Queryable State Server @ /127.0.0.1:9067. > 2017-11-13 13:51:32,042 INFO > org.apache.flink.runtime.io.network.NetworkEnvironment- Started the > Queryable State Data Server @ /127.0.0.1:9067 > 2017-11-13 13:51:32,049 INFO > org.apache.flink.queryablestate.network.AbstractServerBase- Started the > Queryable State Proxy Server @ /127.0.0.1:9069.2017-11-13 13:51:32,049 INFO > org.apache.flink.runtime.io.network.NetworkEnvironment- Started the > Queryable State Client Proxy @ /127.0.0.1:9069 > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8059) Querying the state of a non-existing jobs should throw JobNotFoundException
[ https://issues.apache.org/jira/browse/FLINK-8059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8059. - Resolution: Fixed Merged with 2fe078f3927595cbc3c5de6635a710494e0f34b4 > Querying the state of a non-existing jobs should throw JobNotFoundException > --- > > Key: FLINK-8059 > URL: https://issues.apache.org/jira/browse/FLINK-8059 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > When querying the state for a non-existing job you currently get a > IllegalStateException. Given that this isn't an illegal state we should > return a JobNotFoundException instead. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8065) Extend exception message if QS client is already shut down
[ https://issues.apache.org/jira/browse/FLINK-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8065. - Resolution: Fixed Merged with 75c14541fdc52d5446b179e8e660b8a4fd90310c > Extend exception message if QS client is already shut down > -- > > Key: FLINK-8065 > URL: https://issues.apache.org/jira/browse/FLINK-8065 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Minor > > Calling {{QueryableStateClient#getKvState}} after shutting it down leads to > this rather short exception message: > {code} > Caused by: java.lang.IllegalStateException: Shut down > at > org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:136) > at > org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:251) > {code} > We can extend this just a little bit. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8062) QueryableStateClient#getKvState(...N Namespace,...) not documented
[ https://issues.apache.org/jira/browse/FLINK-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8062. - Resolution: Fixed Merged with ff7e3cf6749a6b6bc898fde871c36661c8629c23 > QueryableStateClient#getKvState(...N Namespace,...) not documented > -- > > Key: FLINK-8062 > URL: https://issues.apache.org/jira/browse/FLINK-8062 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{QueryableStateClient}} has this method: > {code} > publicCompletableFuture getKvState( > final JobID jobId, > final String queryableStateName, > final K key, > final N namespace, > final TypeInformation keyTypeInfo, > final TypeInformation namespaceTypeInfo, > final StateDescriptor stateDescriptor) { > {code} > There is no documentation on how to use this method or what namespaces are. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail
[ https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256774#comment-16256774 ] ASF GitHub Bot commented on FLINK-8057: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5028 > KvStateRegistry#registerKvState may randomly fail > - > > Key: FLINK-8057 > URL: https://issues.apache.org/jira/browse/FLINK-8057 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, > a new KvStateID is generated, and we then check whether a state for this ID > was already registered. Realistically, this is never the case making the > check unnecessary, and on the off-chance that it does happen the job will > fail for no good reason. > {code} > KvStateID kvStateId = new KvStateID(); > if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { > KvStateRegistryListener listener = this.listener.get(); > if (listener != null) { > listener.notifyKvStateRegistered( > jobId, > jobVertexId, > keyGroupRange, > registrationName, > kvStateId); > } > return kvStateId; > } else { > throw new IllegalStateException(kvStateId + " is already registered."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail
[ https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8057. - Resolution: Fixed Merged at a4d86975967942054d1bd466641e9c835fb014ac > KvStateRegistry#registerKvState may randomly fail > - > > Key: FLINK-8057 > URL: https://issues.apache.org/jira/browse/FLINK-8057 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, > a new KvStateID is generated, and we then check whether a state for this ID > was already registered. Realistically, this is never the case making the > check unnecessary, and on the off-chance that it does happen the job will > fail for no good reason. > {code} > KvStateID kvStateId = new KvStateID(); > if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { > KvStateRegistryListener listener = this.listener.get(); > if (listener != null) { > listener.notifyKvStateRegistered( > jobId, > jobVertexId, > keyGroupRange, > registrationName, > kvStateId); > } > return kvStateId; > } else { > throw new IllegalStateException(kvStateId + " is already registered."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5028: [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][F...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5028 ---
[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail
[ https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256747#comment-16256747 ] ASF GitHub Bot commented on FLINK-8057: --- GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5028 [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][FLINK-8062] This PR simply fixes multiple minor issues, like error messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-small-stuff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5028.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5028 commit ff7e3cf6749a6b6bc898fde871c36661c8629c23 Author: kkloudasDate: 2017-11-15T14:32:42Z [FLINK-8062][QS] Make getKvState() with namespace private. commit 75c14541fdc52d5446b179e8e660b8a4fd90310c Author: kkloudas Date: 2017-11-15T14:38:36Z [FLINK-8065][QS] Improve error message when client already shut down. commit 5e059e968633c4292734ebed209fa1b3c30529a1 Author: kkloudas Date: 2017-11-16T16:02:16Z [FLINK-8055][QS] Deduplicate logging messages about QS start. commit 2fe078f3927595cbc3c5de6635a710494e0f34b4 Author: kkloudas Date: 2017-11-16T16:45:49Z [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds commit a4d86975967942054d1bd466641e9c835fb014ac Author: kkloudas Date: 2017-11-17T08:26:10Z [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState() > KvStateRegistry#registerKvState may randomly fail > - > > Key: FLINK-8057 > URL: https://issues.apache.org/jira/browse/FLINK-8057 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, > a new KvStateID is generated, and we then check whether a state for this ID > was already registered. Realistically, this is never the case making the > check unnecessary, and on the off-chance that it does happen the job will > fail for no good reason. > {code} > KvStateID kvStateId = new KvStateID(); > if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { > KvStateRegistryListener listener = this.listener.get(); > if (listener != null) { > listener.notifyKvStateRegistered( > jobId, > jobVertexId, > keyGroupRange, > registrationName, > kvStateId); > } > return kvStateId; > } else { > throw new IllegalStateException(kvStateId + " is already registered."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5028: [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][F...
GitHub user kl0u opened a pull request: https://github.com/apache/flink/pull/5028 [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][FLINK-8062] This PR simply fixes multiple minor issues, like error messages. You can merge this pull request into a Git repository by running: $ git pull https://github.com/kl0u/flink qs-small-stuff Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/5028.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #5028 commit ff7e3cf6749a6b6bc898fde871c36661c8629c23 Author: kkloudasDate: 2017-11-15T14:32:42Z [FLINK-8062][QS] Make getKvState() with namespace private. commit 75c14541fdc52d5446b179e8e660b8a4fd90310c Author: kkloudas Date: 2017-11-15T14:38:36Z [FLINK-8065][QS] Improve error message when client already shut down. commit 5e059e968633c4292734ebed209fa1b3c30529a1 Author: kkloudas Date: 2017-11-16T16:02:16Z [FLINK-8055][QS] Deduplicate logging messages about QS start. commit 2fe078f3927595cbc3c5de6635a710494e0f34b4 Author: kkloudas Date: 2017-11-16T16:45:49Z [FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries with unknown jobIds commit a4d86975967942054d1bd466641e9c835fb014ac Author: kkloudas Date: 2017-11-17T08:26:10Z [FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState() ---
[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256732#comment-16256732 ] ASF GitHub Bot commented on FLINK-8063: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5021 > Client blocks indefinitely when querying a non-existing state > - > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-8063) Client blocks indefinitely when querying a non-existing state
[ https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas closed FLINK-8063. - Resolution: Fixed merged with a0838de79ff73b0322f3ce255df54f5f33b2bf3b > Client blocks indefinitely when querying a non-existing state > - > > Key: FLINK-8063 > URL: https://issues.apache.org/jira/browse/FLINK-8063 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas >Priority: Critical > Fix For: 1.4.0 > > > When querying for a non-existing state (as in, no state was registered under > queryableStateName) the client blocks indefinitely. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/5021 ---
[jira] [Commented] (FLINK-8096) Fix time material issue when write to TableSink
[ https://issues.apache.org/jira/browse/FLINK-8096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256711#comment-16256711 ] ASF GitHub Bot commented on FLINK-8096: --- Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151637017 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- Yes, you are right. Thanks for the explanation @dianfu. > Fix time material issue when write to TableSink > --- > > Key: FLINK-8096 > URL: https://issues.apache.org/jira/browse/FLINK-8096 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > We will get the following exception for unit test > {{TimeAttributesITCase.testCalcMaterialization3}}. > {noformat} > org.apache.flink.table.api.TableException: Found more than one rowtime field: > [rowtime, _c1, _c2] in the table that should be converted to a DataStream. > Please select the rowtime field that should be used as event-time timestamp > for the DataStream by casting all other fields to TIMESTAMP. > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:783) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:263) > at org.apache.flink.table.api.Table.writeToSink(table.scala:862) > at org.apache.flink.table.api.Table.writeToSink(table.scala:830) > at > org.apache.flink.table.runtime.stream.TimeAttributesITCase.testCalcMaterialization3(TimeAttributesITCase.scala:196) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
Github user xccui commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151637017 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- Yes, you are right. Thanks for the explanation @dianfu. ---
[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem
[ https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256656#comment-16256656 ] ASF GitHub Bot commented on FLINK-4228: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4939 I'm actually with Chesnay on the part of using exclusions (and letting AWS pull them in with the right version) rather than adding explicit dependencies because it will be easier to debug than an error due to a version conflict. We can immediately spot those `ClassNotFoundException` instances in the failing unit tests (recall that those dependencies are all in `test` scope!) while as a version conflict may show up more subtle and be less easy to spot. > YARN artifact upload does not work with S3AFileSystem > - > > Key: FLINK-4228 > URL: https://issues.apache.org/jira/browse/FLINK-4228 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Ufuk Celebi >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The issue now is exclusive to running on YARN with s3a:// as your configured > FileSystem. If so, the Flink session will fail on staging itself because it > tries to copy the flink/lib directory to S3 and the S3aFileSystem does not > support recursive copy. > h2. Old Issue > Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) > leads to an Exception when uploading the snapshot to S3 when using the > {{S3AFileSystem}}. > {code} > AsynchronousException{com.amazonaws.AmazonClientException: Unable to > calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory)} > at > org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870) > Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108) > at > com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150) > at > com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50) > at java.util.concurrent.FutureTask.run(FutureTask.java:266) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) > at java.lang.Thread.run(Thread.java:745) > Caused by: java.io.FileNotFoundException: > /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886 > (Is a directory) > at java.io.FileInputStream.open0(Native Method) > at java.io.FileInputStream.open(FileInputStream.java:195) > at java.io.FileInputStream.(FileInputStream.java:138) > at > com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294) > ... 9 more > {code} > Running with S3NFileSystem, the error does not occur. The problem might be > due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created > automatically. We might need to manually create folders and copy only actual > files for {{S3AFileSystem}}. More investigation is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/4939 I'm actually with Chesnay on the part of using exclusions (and letting AWS pull them in with the right version) rather than adding explicit dependencies because it will be easier to debug than an error due to a version conflict. We can immediately spot those `ClassNotFoundException` instances in the failing unit tests (recall that those dependencies are all in `test` scope!) while as a version conflict may show up more subtle and be less easy to spot. ---
[GitHub] flink pull request #5013: [FLINK-7973] disable JNI bridge for relocated hado...
Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/5013 ---
[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems
[ https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256646#comment-16256646 ] ASF GitHub Bot commented on FLINK-7973: --- Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5013 Fixed on master in 0e5fb0b78cd0a3ccb144071a47579eb6c3d0570a e9e7c3372189db7e933ff59114b9ec6245838eda Fixed on release-1.4 in 25a28ab32609c45fb8c40f717148e32fb453d2fc 9f68212603e3601e2f7a67ff93be9b15844c14da > Fix service shading relocation for S3 file systems > -- > > Key: FLINK-7973 > URL: https://issues.apache.org/jira/browse/FLINK-7973 > Project: Flink > Issue Type: Bug >Reporter: Stephan Ewen >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The shade plugin relocates services incorrectly currently, applying > relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems
[ https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256647#comment-16256647 ] ASF GitHub Bot commented on FLINK-7973: --- Github user NicoK closed the pull request at: https://github.com/apache/flink/pull/5013 > Fix service shading relocation for S3 file systems > -- > > Key: FLINK-7973 > URL: https://issues.apache.org/jira/browse/FLINK-7973 > Project: Flink > Issue Type: Bug >Reporter: Stephan Ewen >Assignee: Nico Kruber >Priority: Blocker > Fix For: 1.4.0 > > > The shade plugin relocates services incorrectly currently, applying > relocation patterns multiple times. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #5013: [FLINK-7973] disable JNI bridge for relocated hadoop clas...
Github user NicoK commented on the issue: https://github.com/apache/flink/pull/5013 Fixed on master in 0e5fb0b78cd0a3ccb144071a47579eb6c3d0570a e9e7c3372189db7e933ff59114b9ec6245838eda Fixed on release-1.4 in 25a28ab32609c45fb8c40f717148e32fb453d2fc 9f68212603e3601e2f7a67ff93be9b15844c14da ---
[jira] [Commented] (FLINK-8096) Fix time material issue when write to TableSink
[ https://issues.apache.org/jira/browse/FLINK-8096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256636#comment-16256636 ] ASF GitHub Bot commented on FLINK-8096: --- Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151625442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- The `resultType` generated from the optimized plan contains time indicator information. In `StreamTableEnvironment.translate`, it needs this information to transform the time indicator column to `TimeStamp` type. For the type consistent issue, it will be validated during converting CRow to output type: https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941 > Fix time material issue when write to TableSink > --- > > Key: FLINK-8096 > URL: https://issues.apache.org/jira/browse/FLINK-8096 > Project: Flink > Issue Type: Bug > Components: Table API & SQL >Reporter: Dian Fu >Assignee: Dian Fu > Fix For: 1.5.0 > > > We will get the following exception for unit test > {{TimeAttributesITCase.testCalcMaterialization3}}. > {noformat} > org.apache.flink.table.api.TableException: Found more than one rowtime field: > [rowtime, _c1, _c2] in the table that should be converted to a DataStream. > Please select the rowtime field that should be used as event-time timestamp > for the DataStream by casting all other fields to TIMESTAMP. > at > org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:783) > at > org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:263) > at org.apache.flink.table.api.Table.writeToSink(table.scala:862) > at org.apache.flink.table.api.Table.writeToSink(table.scala:830) > at > org.apache.flink.table.runtime.stream.TimeAttributesITCase.testCalcMaterialization3(TimeAttributesITCase.scala:196) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) > at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) > at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) > at org.junit.rules.RunRules.evaluate(RunRules.java:20) > at org.junit.runners.ParentRunner.run(ParentRunner.java:363) > at org.junit.runner.JUnitCore.run(JUnitCore.java:137) > at > com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) > at > com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51) > at > com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237) > at > com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) > {noformat}
[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...
Github user dianfu commented on a diff in the pull request: https://github.com/apache/flink/pull/5025#discussion_r151625442 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala --- @@ -234,11 +234,12 @@ abstract class StreamTableEnvironment( "UpsertStreamTableSink requires that Table has a full primary keys if it is updated.") } val outputType = sink.getOutputType +val resultType = getResultType(table.getRelNode, optimizedPlan) --- End diff -- The `resultType` generated from the optimized plan contains time indicator information. In `StreamTableEnvironment.translate`, it needs this information to transform the time indicator column to `TimeStamp` type. For the type consistent issue, it will be validated during converting CRow to output type: https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941 ---
[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail
[ https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256629#comment-16256629 ] Kostas Kloudas commented on FLINK-8057: --- I agree that this check is never failing but if it ever fails, then there is something wrong. So I would recommend to keep the check, as I consider a good practice to be defensive and have a check for every possible execution path (as long as it is not in the critical path), and just change the exception message to sth that indicates that execution took an unexpected path. What about sth along the lines: kvStateId + " appears registered although it should not." ? > KvStateRegistry#registerKvState may randomly fail > - > > Key: FLINK-8057 > URL: https://issues.apache.org/jira/browse/FLINK-8057 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, > a new KvStateID is generated, and we then check whether a state for this ID > was already registered. Realistically, this is never the case making the > check unnecessary, and on the off-chance that it does happen the job will > fail for no good reason. > {code} > KvStateID kvStateId = new KvStateID(); > if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { > KvStateRegistryListener listener = this.listener.get(); > if (listener != null) { > listener.notifyKvStateRegistered( > jobId, > jobVertexId, > keyGroupRange, > registrationName, > kvStateId); > } > return kvStateId; > } else { > throw new IllegalStateException(kvStateId + " is already registered."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail
[ https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Kostas Kloudas reassigned FLINK-8057: - Assignee: Kostas Kloudas > KvStateRegistry#registerKvState may randomly fail > - > > Key: FLINK-8057 > URL: https://issues.apache.org/jira/browse/FLINK-8057 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Kostas Kloudas > > The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, > a new KvStateID is generated, and we then check whether a state for this ID > was already registered. Realistically, this is never the case making the > check unnecessary, and on the off-chance that it does happen the job will > fail for no good reason. > {code} > KvStateID kvStateId = new KvStateID(); > if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) { > KvStateRegistryListener listener = this.listener.get(); > if (listener != null) { > listener.notifyKvStateRegistered( > jobId, > jobVertexId, > keyGroupRange, > registrationName, > kvStateId); > } > return kvStateId; > } else { > throw new IllegalStateException(kvStateId + " is already registered."); > } > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)