[jira] [Commented] (FLINK-8102) Formatting issues in Mesos documentation.

2017-11-17 Thread Joerg Schad (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257954#comment-16257954
 ] 

Joerg Schad commented on FLINK-8102:


https://github.com/apache/flink/pull/5033

> Formatting issues in Mesos documentation. 
> --
>
> Key: FLINK-8102
> URL: https://issues.apache.org/jira/browse/FLINK-8102
> Project: Flink
>  Issue Type: Bug
>Reporter: Joerg Schad
>Assignee: Joerg Schad
>Priority: Minor
>
> The Flink documentation renders incorrectly as some characters are not 
> probably escaped.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8102) Formatting issues in Mesos documentation.

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8102?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257953#comment-16257953
 ] 

ASF GitHub Bot commented on FLINK-8102:
---

GitHub user joerg84 opened a pull request:

https://github.com/apache/flink/pull/5033

[FLINK-8102][docs] Fixed formatting issues in Mesos documentation.

This change fixes formatting issues occurring in the markdown rendering.
It was tested was building the documentation locally.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joerg84/flink docu

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5033.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5033


commit bb2826b085da4a4119c13a1e6babdd78fc8de6aa
Author: Joerg Schad 
Date:   2017-11-18T06:23:02Z

[FLINK-8102][docs] Fixed formatting issues in Mesos documentation.




> Formatting issues in Mesos documentation. 
> --
>
> Key: FLINK-8102
> URL: https://issues.apache.org/jira/browse/FLINK-8102
> Project: Flink
>  Issue Type: Bug
>Reporter: Joerg Schad
>Assignee: Joerg Schad
>Priority: Minor
>
> The Flink documentation renders incorrectly as some characters are not 
> probably escaped.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5033: [FLINK-8102][docs] Fixed formatting issues in Meso...

2017-11-17 Thread joerg84
GitHub user joerg84 opened a pull request:

https://github.com/apache/flink/pull/5033

[FLINK-8102][docs] Fixed formatting issues in Mesos documentation.

This change fixes formatting issues occurring in the markdown rendering.
It was tested was building the documentation locally.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/joerg84/flink docu

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5033.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5033


commit bb2826b085da4a4119c13a1e6babdd78fc8de6aa
Author: Joerg Schad 
Date:   2017-11-18T06:23:02Z

[FLINK-8102][docs] Fixed formatting issues in Mesos documentation.




---


[jira] [Created] (FLINK-8102) Formatting issues in Mesos documentation.

2017-11-17 Thread Joerg Schad (JIRA)
Joerg Schad created FLINK-8102:
--

 Summary: Formatting issues in Mesos documentation. 
 Key: FLINK-8102
 URL: https://issues.apache.org/jira/browse/FLINK-8102
 Project: Flink
  Issue Type: Bug
Reporter: Joerg Schad
Assignee: Joerg Schad
Priority: Minor


The Flink documentation renders incorrectly as some characters are not probably 
escaped.

https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/mesos.html
https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/config.html#mesos



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8101) Elasticsearch 6.x support

2017-11-17 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-8101:
-

 Summary: Elasticsearch 6.x support
 Key: FLINK-8101
 URL: https://issues.apache.org/jira/browse/FLINK-8101
 Project: Flink
  Issue Type: New Feature
  Components: ElasticSearch Connector
Affects Versions: 1.4.0
Reporter: Hai Zhou UTC+8
 Fix For: 1.5.0


Recently, elasticsearch 6.0.0 was released: 
https://www.elastic.co/blog/elasticsearch-6-0-0-released  
The minimum version of ES6 compatible Elasticsearch Java Client is 5.6.0




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-6222) YARN: setting environment variables in an easier fashion

2017-11-17 Thread Craig Foster (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6222?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Craig Foster updated FLINK-6222:

Attachment: patch0-add-yarn-hadoop-conf.diff

> YARN: setting environment variables in an easier fashion
> 
>
> Key: FLINK-6222
> URL: https://issues.apache.org/jira/browse/FLINK-6222
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
> Environment: YARN, EMR
>Reporter: Craig Foster
> Attachments: patch0-add-yarn-hadoop-conf.diff
>
>
> Right now we require end-users to set YARN_CONF_DIR or HADOOP_CONF_DIR and 
> sometimes FLINK_CONF_DIR.
> For example, in [1], it is stated: 
> “Please note that the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR 
> environment variable to be set to read the YARN and HDFS configuration.” 
> In BigTop, we set this with /etc/flink/default and then a wrapper is created 
> to source that. However, this is slightly cumbersome and we don't have a 
> central place within the Flink project itself to source environment 
> variables. config.sh could do this but it doesn't have information about 
> FLINK_CONF_DIR. For YARN and Hadoop variables, I already have a solution that 
> would add "env.yarn.confdir" and "env.hadoop.confdir" variables to the 
> flink-conf.yaml file and then we just symlink /etc/lib/flink/conf/ and 
> /etc/flink/conf. 
> But we could also add a flink-env.sh file to set these variables and decouple 
> them from config.sh entirely. 
> I'd like to know the opinion/preference of others and what would be more 
> amenable. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6222) YARN: setting environment variables in an easier fashion

2017-11-17 Thread Craig Foster (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257610#comment-16257610
 ] 

Craig Foster commented on FLINK-6222:
-

I'm submitting a patch here to see if we can move this along. 

> YARN: setting environment variables in an easier fashion
> 
>
> Key: FLINK-6222
> URL: https://issues.apache.org/jira/browse/FLINK-6222
> Project: Flink
>  Issue Type: Improvement
>  Components: Startup Shell Scripts
>Affects Versions: 1.2.0
> Environment: YARN, EMR
>Reporter: Craig Foster
>
> Right now we require end-users to set YARN_CONF_DIR or HADOOP_CONF_DIR and 
> sometimes FLINK_CONF_DIR.
> For example, in [1], it is stated: 
> “Please note that the Client requires the YARN_CONF_DIR or HADOOP_CONF_DIR 
> environment variable to be set to read the YARN and HDFS configuration.” 
> In BigTop, we set this with /etc/flink/default and then a wrapper is created 
> to source that. However, this is slightly cumbersome and we don't have a 
> central place within the Flink project itself to source environment 
> variables. config.sh could do this but it doesn't have information about 
> FLINK_CONF_DIR. For YARN and Hadoop variables, I already have a solution that 
> would add "env.yarn.confdir" and "env.hadoop.confdir" variables to the 
> flink-conf.yaml file and then we just symlink /etc/lib/flink/conf/ and 
> /etc/flink/conf. 
> But we could also add a flink-env.sh file to set these variables and decouple 
> them from config.sh entirely. 
> I'd like to know the opinion/preference of others and what would be more 
> amenable. 



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

2017-11-17 Thread shashank734
Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
Yeah, But I think 1.4 will take some more couple of weeks. As unable to 
restore the state, So it's necessary to change for me. 


---


[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257372#comment-16257372
 ] 

ASF GitHub Bot commented on FLINK-7835:
---

Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
Yeah, But I think 1.4 will take some more couple of weeks. As unable to 
restore the state, So it's necessary to change for me. 


> Fix duplicate() method in NFASerializer
> ---
>
> Key: FLINK-7835
> URL: https://issues.apache.org/jira/browse/FLINK-7835
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0, 1.3.1, 1.3.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8100) Consider introducing log4j-extras

2017-11-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8100:
-

 Summary: Consider introducing log4j-extras 
 Key: FLINK-8100
 URL: https://issues.apache.org/jira/browse/FLINK-8100
 Project: Flink
  Issue Type: Improvement
Reporter: Ted Yu


log4j-extras allows log rotation as well as compression.

https://logging.apache.org/log4j/extras/download.html

We should consider using log4j-extras.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257329#comment-16257329
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5032
  
Can you please add a unit test for this?


> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5032: [FLINK-8090] [DataStream] Improve the error message for d...

2017-11-17 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5032
  
Can you please add a unit test for this?


---


[jira] [Commented] (FLINK-6936) Add multiple targets support for custom partitioner

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6936?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257198#comment-16257198
 ] 

ASF GitHub Bot commented on FLINK-6936:
---

Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/4297


> Add multiple targets support for custom partitioner
> ---
>
> Key: FLINK-6936
> URL: https://issues.apache.org/jira/browse/FLINK-6936
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Reporter: Xingcan Cui
>Assignee: Xingcan Cui
>Priority: Minor
>
> The current user-facing Partitioner only allows returning one target.
> {code:java}
> @Public
> public interface Partitioner extends java.io.Serializable, Function {
>   /**
>* Computes the partition for the given key.
>*
>* @param key The key.
>* @param numPartitions The number of partitions to partition into.
>* @return The partition index.
>*/
>   int partition(K key, int numPartitions);
> }
> {code}
> Actually, this function should return multiple partitions and this may be a 
> historical legacy.
> There could be at least three approaches to solve this.
> # Make the `protected DataStream setConnectionType(StreamPartitioner 
> partitioner)` method in DataStream public and that allows users to directly 
> define StreamPartitioner.
> # Change the `partition` method in the Partitioner interface to return an int 
> array instead of a single int value.
> # Add a new `multicast` method to DataStream and provide a MultiPartitioner 
> interface which returns an int array.
> Considering the consistency of API, the 3rd approach seems to be an 
> acceptable choice. [~aljoscha], what do you think?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4297: [FLINK-6936] [streaming] Add multiple targets supp...

2017-11-17 Thread xccui
Github user xccui closed the pull request at:

https://github.com/apache/flink/pull/4297


---


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257193#comment-16257193
 ] 

ASF GitHub Bot commented on FLINK-8090:
---

GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5032

[FLINK-8090] [DataStream] Improve the error message for duplicate state name

## What is the purpose of the change

This PR improves the error message when users trying to access two states 
of different types, but with an identical name. However, it cannot detect two 
states of the same type and name, but are registered via two descriptors.

## Brief change log

Refactor `DefaultKeyedStateStore.java` to raise an error message when the 
required state type does not match the real state type.

## Verifying this change

The change can be verified by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8090

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5032


commit 78acc38ff0a1527eeb8204a74b65765bc673a6b3
Author: Xingcan Cui 
Date:   2017-11-17T12:13:49Z

[FLINK-8090] [DataStream] Improve error message when registering different 
states under the same name




> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> 

[GitHub] flink pull request #5032: [FLINK-8090] [DataStream] Improve the error messag...

2017-11-17 Thread xccui
GitHub user xccui opened a pull request:

https://github.com/apache/flink/pull/5032

[FLINK-8090] [DataStream] Improve the error message for duplicate state name

## What is the purpose of the change

This PR improves the error message when users trying to access two states 
of different types, but with an identical name. However, it cannot detect two 
states of the same type and name, but are registered via two descriptors.

## Brief change log

Refactor `DefaultKeyedStateStore.java` to raise an error message when the 
required state type does not match the real state type.

## Verifying this change

The change can be verified by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/xccui/flink FLINK-8090

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5032.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5032


commit 78acc38ff0a1527eeb8204a74b65765bc673a6b3
Author: Xingcan Cui 
Date:   2017-11-17T12:13:49Z

[FLINK-8090] [DataStream] Improve error message when registering different 
states under the same name




---


[jira] [Commented] (FLINK-8083) FileSystem class not binary compatible with 1.3

2017-11-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257180#comment-16257180
 ] 

Aljoscha Krettek commented on FLINK-8083:
-

[~Zentol] I think you can now merge that change to the japicmp settings.

> FileSystem class not binary compatible with 1.3
> ---
>
> Key: FLINK-8083
> URL: https://issues.apache.org/jira/browse/FLINK-8083
> Project: Flink
>  Issue Type: Bug
>  Components: Core, FileSystem
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The {{FileSystem}} class it not binary compatible with 1.3 due to a removed 
> method.
> {code}
> REMOVED (!)   public abstract org.apache.flink.core.fs.FileSystemKind 
> getKind() 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7266.
---
Resolution: Fixed

Fixed on release-1.4 in
666b1b2e62463ae4985d237535d56c9e0ab9dba9

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8083) FileSystem class not binary compatible with 1.3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-8083.
---
Resolution: Fixed

This should be fixed now that FLINK-7265 is on master and release-1.4.

> FileSystem class not binary compatible with 1.3
> ---
>
> Key: FLINK-8083
> URL: https://issues.apache.org/jira/browse/FLINK-8083
> Project: Flink
>  Issue Type: Bug
>  Components: Core, FileSystem
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The {{FileSystem}} class it not binary compatible with 1.3 due to a removed 
> method.
> {code}
> REMOVED (!)   public abstract org.apache.flink.core.fs.FileSystemKind 
> getKind() 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek closed FLINK-7265.
---
Resolution: Fixed

Fixed on release-1.4 in
a0dbe182fa677a87f601cbedc4115e63fff9fe4f

> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8099) Reduce default restart delay to 1 second

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8099?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257176#comment-16257176
 ] 

ASF GitHub Bot commented on FLINK-8099:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5031

[FLINK-8099] Reduce default restart delay to 1 second

R: @tillrohrmann 

We could also introduce an extra entry in `ConfigConstants` for the `"1 s"` 
default but I opted against that. What do you think?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8099-1s-delay-restart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5031


commit 6d1d9b1caabfcd731ec2b2ba952fc5a0b27e98e7
Author: Aljoscha Krettek 
Date:   2017-11-17T16:19:51Z

[FLINK-8099] Reduce default restart delay to 1 second




> Reduce default restart delay to 1 second
> 
>
> Key: FLINK-8099
> URL: https://issues.apache.org/jira/browse/FLINK-8099
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, when a job fails Flink will wait for 10 seconds until restarting 
> the job. Even zero delay is a reasonable setting but will result in 
> "flooding" the logs and quickly increasing the restart counter because at 
> zero delay you will always see failures when no standby resources are 
> available.
> Reducing this to 1 second should make for a nicer out-of-box experience and 
> not flood too much.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5031: [FLINK-8099] Reduce default restart delay to 1 sec...

2017-11-17 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5031

[FLINK-8099] Reduce default restart delay to 1 second

R: @tillrohrmann 

We could also introduce an extra entry in `ConfigConstants` for the `"1 s"` 
default but I opted against that. What do you think?

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8099-1s-delay-restart

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5031.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5031


commit 6d1d9b1caabfcd731ec2b2ba952fc5a0b27e98e7
Author: Aljoscha Krettek 
Date:   2017-11-17T16:19:51Z

[FLINK-8099] Reduce default restart delay to 1 second




---


[jira] [Updated] (FLINK-8099) Reduce default restart delay to 1 second

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8099?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-8099:

Issue Type: Improvement  (was: Bug)

> Reduce default restart delay to 1 second
> 
>
> Key: FLINK-8099
> URL: https://issues.apache.org/jira/browse/FLINK-8099
> Project: Flink
>  Issue Type: Improvement
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Currently, when a job fails Flink will wait for 10 seconds until restarting 
> the job. Even zero delay is a reasonable setting but will result in 
> "flooding" the logs and quickly increasing the restart counter because at 
> zero delay you will always see failures when no standby resources are 
> available.
> Reducing this to 1 second should make for a nicer out-of-box experience and 
> not flood too much.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-8099) Reduce default restart delay to 1 second

2017-11-17 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-8099:
---

 Summary: Reduce default restart delay to 1 second
 Key: FLINK-8099
 URL: https://issues.apache.org/jira/browse/FLINK-8099
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Reporter: Aljoscha Krettek
Assignee: Aljoscha Krettek
Priority: Blocker
 Fix For: 1.4.0


Currently, when a job fails Flink will wait for 10 seconds until restarting the 
job. Even zero delay is a reasonable setting but will result in "flooding" the 
logs and quickly increasing the restart counter because at zero delay you will 
always see failures when no standby resources are available.

Reducing this to 1 second should make for a nicer out-of-box experience and not 
flood too much.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257147#comment-16257147
 ] 

ASF GitHub Bot commented on FLINK-7265:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4397


> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257146#comment-16257146
 ] 

Aljoscha Krettek commented on FLINK-7266:
-

Fixed on master (to be 1.5) in
b00f1b326c1ab4221a555200a4d5798e1565b821

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4397: [FLINK-7265] [FLINK-7266] Introduce FileSystemKind...

2017-11-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4397


---


[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257144#comment-16257144
 ] 

ASF GitHub Bot commented on FLINK-7265:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4397
  
@StephanEwen Could you please close this PR now that it's also merged for 
master/1.4?


> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257145#comment-16257145
 ] 

Aljoscha Krettek commented on FLINK-7265:
-

Added on master (to be 1.5) in
f29f80575dac1c7e59dd7074118953b8be26520f

> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4397: [FLINK-7265] [FLINK-7266] Introduce FileSystemKind and Co...

2017-11-17 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/4397
  
@StephanEwen Could you please close this PR now that it's also merged for 
master/1.4?


---


[jira] [Assigned] (FLINK-7841) Add docs for Flink's S3 support

2017-11-17 Thread Nico Kruber (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nico Kruber reassigned FLINK-7841:
--

Assignee: Nico Kruber  (was: Stephan Ewen)

> Add docs for Flink's S3 support
> ---
>
> Key: FLINK-7841
> URL: https://issues.apache.org/jira/browse/FLINK-7841
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7988) HadoopS3FileSystemITCase leaves test directories behind in S3

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7988?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257102#comment-16257102
 ] 

ASF GitHub Bot commented on FLINK-7988:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4950
  
Changes look good to me. Thanks for your contribution @NicoK. Merging this 
PR.


> HadoopS3FileSystemITCase leaves test directories behind in S3
> -
>
> Key: FLINK-7988
> URL: https://issues.apache.org/jira/browse/FLINK-7988
> Project: Flink
>  Issue Type: Bug
>  Components: filesystem-connector, Tests
>Affects Versions: 1.4.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> {{HadoopS3FileSystemITCase}} creates a random test (base) directory in S3 
> which is not cleaned up. Please note, that the individual tests create 
> sub-directories and also always delete those at least.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4950: [FLINK-7988][s3] fix HadoopS3FileSystemITCase leaving tes...

2017-11-17 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4950
  
Changes look good to me. Thanks for your contribution @NicoK. Merging this 
PR.


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257100#comment-16257100
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4939
  
Alright, then we'll do it as you've proposed. I'll merge this PR. Thanks 
for your work @NicoK and the review @zentol and @StephanEwen.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...

2017-11-17 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4939
  
Alright, then we'll do it as you've proposed. I'll merge this PR. Thanks 
for your work @NicoK and the review @zentol and @StephanEwen.


---


[jira] [Closed] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8061.
-
Resolution: Fixed

Merged with 3edbb7bce5b30386a67b1b01ef1591a681601219.

> Remove trailing asterisk in QueryableStateClient javadocs
> -
>
> Key: FLINK-8061
> URL: https://issues.apache.org/jira/browse/FLINK-8061
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeHint   A {@link TypeHint} used 
> to extract the type of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeInfo   The {@link 
> TypeInformation} of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16257042#comment-16257042
 ] 

ASF GitHub Bot commented on FLINK-8061:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5008
  
@vetriselvan1187 Merged. Can you close this PR?


> Remove trailing asterisk in QueryableStateClient javadocs
> -
>
> Key: FLINK-8061
> URL: https://issues.apache.org/jira/browse/FLINK-8061
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeHint   A {@link TypeHint} used 
> to extract the type of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeInfo   The {@link 
> TypeInformation} of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5008: [FLINK-8061] [Queryable State] removed trailing asterisk ...

2017-11-17 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5008
  
@vetriselvan1187 Merged. Can you close this PR?


---


[GitHub] flink issue #5023: [hotfix][docs] Review of concepts docs for grammar, forma...

2017-11-17 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5023
  
@ChrisChinchilla I think you want to `git rebase origin/master` (or 
whatever you have named upstream). Did you do a merge instead?


---


[jira] [Updated] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7266:

Fix Version/s: (was: 1.5.0)

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7265) FileSystems should describe their kind and consistency level

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7265?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7265:

Fix Version/s: (was: 1.5.0)
   1.4.0

> FileSystems should describe their kind and consistency level
> 
>
> Key: FLINK-7265
> URL: https://issues.apache.org/jira/browse/FLINK-7265
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0, 1.3.3
>
>
> Currently, all {{FileSystem}} types look the same to Flink.
> However, certain operations should only be executed on certain kinds of file 
> systems.
> For example, it makes no sense to attempt to delete an empty parent directory 
> on S3, because there are no such thinks as directories, only hierarchical 
> naming in the keys (file names).



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8083) FileSystem class not binary compatible with 1.3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8083?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-8083:
---

Assignee: Aljoscha Krettek

> FileSystem class not binary compatible with 1.3
> ---
>
> Key: FLINK-8083
> URL: https://issues.apache.org/jira/browse/FLINK-8083
> Project: Flink
>  Issue Type: Bug
>  Components: Core, FileSystem
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The {{FileSystem}} class it not binary compatible with 1.3 due to a removed 
> method.
> {code}
> REMOVED (!)   public abstract org.apache.flink.core.fs.FileSystemKind 
> getKind() 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8086) FlinkKafkaProducer011 can permanently fail in recovery through ProducerFencedException

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8086?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256977#comment-16256977
 ] 

ASF GitHub Bot commented on FLINK-8086:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5030

[FLINK-8086][kafka] Ignore ProducerFencedException

ProducerFencedException can happen if we restore twice from the same 
checkpoint
or if we restore from an old savepoint. In both cases transactional.ids 
that we
want to recoverAndCommit have been already committed and reused. 
Reusing mean
that they will be known by Kafka's brokers under newer 
producerId/epochId,
which will result in ProducerFencedException if we try to commit again 
some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of 
failing
we might have a semi silent (with a warning) data loss.

## Verifying this change

This change added a `testFailAndRecoverSameCheckpointTwice` for a scenario 
that was previously not tested (and was failing).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8086

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5030


commit d4542339e39adeff69cf18a647c8a5ac0d97969e
Author: Piotr Nowojski 
Date:   2017-11-17T13:31:07Z

[hotfix][kafka] Improve logging in FlinkKafkaProducer011

commit 642ba6d0c3e204824f1ff69412d70918be9e3ac7
Author: Piotr Nowojski 
Date:   2017-11-17T13:40:30Z

[FLINK-8086][kafka] Ignore ProducerFencedException during recovery

ProducerFencedException can happen if we restore twice from the same 
checkpoint
or if we restore from an old savepoint. In both cases transactional.ids 
that we
want to recoverAndCommit have been already committed and reused. Reusing 
mean
that they will be known by Kafka's brokers under newer producerId/epochId,
which will result in ProducerFencedException if we try to commit again some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of 
failing
we might have a semi silent (with a warning) data loss.




> FlinkKafkaProducer011 can permanently fail in recovery through 
> ProducerFencedException
> --
>
> Key: FLINK-8086
> URL: https://issues.apache.org/jira/browse/FLINK-8086
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Stefan Richter
>Assignee: Piotr Nowojski
>Priority: Blocker
> Fix For: 1.4.0
>
>
> Chaos monkey test in a cluster environment can permanently bring down our 
> FlinkKafkaProducer011.
> Typically, after a small number of randomly killed TMs, the data generator 
> job is no longer able to recover from a checkpoint because of the following 
> problem:
> org.apache.kafka.common.errors.ProducerFencedException: Producer attempted an 
> operation with an old epoch. Either there is a newer producer with the same 
> transactionalId, or the producer's transaction has been expired by the broker.
> The problem is reproduceable and happened for me in every run after the chaos 
> monkey killed a couple of TMs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5030: [FLINK-8086][kafka] Ignore ProducerFencedException

2017-11-17 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5030

[FLINK-8086][kafka] Ignore ProducerFencedException

ProducerFencedException can happen if we restore twice from the same 
checkpoint
or if we restore from an old savepoint. In both cases transactional.ids 
that we
want to recoverAndCommit have been already committed and reused. 
Reusing mean
that they will be known by Kafka's brokers under newer 
producerId/epochId,
which will result in ProducerFencedException if we try to commit again 
some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of 
failing
we might have a semi silent (with a warning) data loss.

## Verifying this change

This change added a `testFailAndRecoverSameCheckpointTwice` for a scenario 
that was previously not tested (and was failing).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8086

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5030.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5030


commit d4542339e39adeff69cf18a647c8a5ac0d97969e
Author: Piotr Nowojski 
Date:   2017-11-17T13:31:07Z

[hotfix][kafka] Improve logging in FlinkKafkaProducer011

commit 642ba6d0c3e204824f1ff69412d70918be9e3ac7
Author: Piotr Nowojski 
Date:   2017-11-17T13:40:30Z

[FLINK-8086][kafka] Ignore ProducerFencedException during recovery

ProducerFencedException can happen if we restore twice from the same 
checkpoint
or if we restore from an old savepoint. In both cases transactional.ids 
that we
want to recoverAndCommit have been already committed and reused. Reusing 
mean
that they will be known by Kafka's brokers under newer producerId/epochId,
which will result in ProducerFencedException if we try to commit again some
old (and already committed) transaction.

Ignoring this exception might hide some bugs/issues, because instead of 
failing
we might have a semi silent (with a warning) data loss.




---


[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256958#comment-16256958
 ] 

ASF GitHub Bot commented on FLINK-8061:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5008
  
Thanks for the work @vetriselvan1187 . I will merge this.


> Remove trailing asterisk in QueryableStateClient javadocs
> -
>
> Key: FLINK-8061
> URL: https://issues.apache.org/jira/browse/FLINK-8061
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeHint   A {@link TypeHint} used 
> to extract the type of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeInfo   The {@link 
> TypeInformation} of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5008: [FLINK-8061] [Queryable State] removed trailing asterisk ...

2017-11-17 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/5008
  
Thanks for the work @vetriselvan1187 . I will merge this.


---


[jira] [Commented] (FLINK-7841) Add docs for Flink's S3 support

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7841?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256951#comment-16256951
 ] 

ASF GitHub Bot commented on FLINK-7841:
---

GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5029

[FLINK-7841][docs] update AWS docs with respect to S3 file system changes

## What is the purpose of the change

This updates the S3 file system configuration section in the AWS 
documentation which has become a lot simpler since we offer the shaded 
`flink-s3-fs-hadoop` and `flink-s3-fs-presto` wrappers.
(please also merge to the 1.4 branch since this should be in the docs 
accompanying it)

## Brief change log

- update and extend the S3 documentation

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **yes**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **docs**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7841

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5029.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5029


commit 33da387f8f4c1be75941b6e24eccf9f1a709f2ba
Author: Nico Kruber 
Date:   2017-11-17T13:13:30Z

[FLINK-7841][docs] update AWS docs with respect to S3 file system changes




> Add docs for Flink's S3 support
> ---
>
> Key: FLINK-7841
> URL: https://issues.apache.org/jira/browse/FLINK-7841
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256950#comment-16256950
 ] 

ASF GitHub Bot commented on FLINK-7835:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4821
  
Yes, I think manually is the safest bet. 
Either way the 1.4 will come soon and this fix is going to be included.


> Fix duplicate() method in NFASerializer
> ---
>
> Key: FLINK-7835
> URL: https://issues.apache.org/jira/browse/FLINK-7835
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0, 1.3.1, 1.3.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5029: [FLINK-7841][docs] update AWS docs with respect to...

2017-11-17 Thread NicoK
GitHub user NicoK opened a pull request:

https://github.com/apache/flink/pull/5029

[FLINK-7841][docs] update AWS docs with respect to S3 file system changes

## What is the purpose of the change

This updates the S3 file system configuration section in the AWS 
documentation which has become a lot simpler since we offer the shaded 
`flink-s3-fs-hadoop` and `flink-s3-fs-presto` wrappers.
(please also merge to the 1.4 branch since this should be in the docs 
accompanying it)

## Brief change log

- update and extend the S3 documentation

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **yes**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **docs**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/NicoK/flink flink-7841

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5029.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5029


commit 33da387f8f4c1be75941b6e24eccf9f1a709f2ba
Author: Nico Kruber 
Date:   2017-11-17T13:13:30Z

[FLINK-7841][docs] update AWS docs with respect to S3 file system changes




---


[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

2017-11-17 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4821
  
Yes, I think manually is the safest bet. 
Either way the 1.4 will come soon and this fix is going to be included.


---


[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

2017-11-17 Thread shashank734
Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
While applying the patch it's giving a lot of conflicts. I think due to 
changes in between. So why I had doubts. I'll apply these changes manually. 


---


[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256934#comment-16256934
 ] 

ASF GitHub Bot commented on FLINK-7835:
---

Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
While applying the patch it's giving a lot of conflicts. I think due to 
changes in between. So why I had doubts. I'll apply these changes manually. 


> Fix duplicate() method in NFASerializer
> ---
>
> Key: FLINK-7835
> URL: https://issues.apache.org/jira/browse/FLINK-7835
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0, 1.3.1, 1.3.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256927#comment-16256927
 ] 

ASF GitHub Bot commented on FLINK-7835:
---

Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4821
  
I think you can but you will have to recompile flink.


> Fix duplicate() method in NFASerializer
> ---
>
> Key: FLINK-7835
> URL: https://issues.apache.org/jira/browse/FLINK-7835
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0, 1.3.1, 1.3.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

2017-11-17 Thread kl0u
Github user kl0u commented on the issue:

https://github.com/apache/flink/pull/4821
  
I think you can but you will have to recompile flink.


---


[jira] [Commented] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256920#comment-16256920
 ] 

Aljoscha Krettek commented on FLINK-7266:
-

We actually have to fix this for 1.4 because we regress from 1.3, otherwise. 
The operation that deletes parent directories is too expensive and will 
basically DDOS s3, making Flink unusable for bigger installations with s3.

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2, 1.5.0
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek reassigned FLINK-7266:
---

Assignee: Aljoscha Krettek  (was: Stephan Ewen)

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2, 1.5.0
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7266) Don't attempt to delete parent directory on S3

2017-11-17 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7266?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7266:

Fix Version/s: 1.4.0

> Don't attempt to delete parent directory on S3
> --
>
> Key: FLINK-7266
> URL: https://issues.apache.org/jira/browse/FLINK-7266
> Project: Flink
>  Issue Type: Bug
>  Components: Core
>Affects Versions: 1.3.1
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.4.0, 1.3.2, 1.5.0
>
>
> Currently, every attempted release of an S3 state object also checks if the 
> "parent directory" is empty and then tries to delete it.
> Not only is that unnecessary on S3, but it is prohibitively expensive and for 
> example causes S3 to throttle calls by the JobManager on checkpoint cleanup.
> The {{FileState}} must only attempt parent directory cleanup when operating 
> against real file systems, not when operating against object stores.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7835) Fix duplicate() method in NFASerializer

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7835?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256910#comment-16256910
 ] 

ASF GitHub Bot commented on FLINK-7835:
---

Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
Can I directly use these changes in the release-1.3.2 tag or there in 
between dependencies?


> Fix duplicate() method in NFASerializer
> ---
>
> Key: FLINK-7835
> URL: https://issues.apache.org/jira/browse/FLINK-7835
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP
>Affects Versions: 1.3.0, 1.3.1, 1.3.2
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4821: [FLINK-7835][cep] Fix duplicate() in NFASerializer.

2017-11-17 Thread shashank734
Github user shashank734 commented on the issue:

https://github.com/apache/flink/pull/4821
  
Can I directly use these changes in the release-1.3.2 tag or there in 
between dependencies?


---


[jira] [Updated] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-17 Thread Shashank Agarwal (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Shashank Agarwal updated FLINK-8098:

Fix Version/s: 1.4.0

> LeaseExpiredException when using FsStateBackend for checkpointing due to 
> multiple mappers tries to access the same file.
> 
>
> Key: FLINK-8098
> URL: https://issues.apache.org/jira/browse/FLINK-8098
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
> Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
>Reporter: Shashank Agarwal
> Fix For: 1.4.0
>
>
> I am running streaming application with parallelism 6. I have enabled 
> checkpointing(1000). But application gets the crash after 1-2 days. After 
> analysing logs i found following trace. 
> {code}
> 2017-11-17 11:19:06,696 WARN  
> org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
> properly clean up the async checkpoint runnable.
> java.lang.Exception: Could not properly cancel managed keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
>   at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
>   at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
>   at 
> org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
>   at 
> org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.util.concurrent.ExecutionException: java.io.IOException: 
> Could not flush and close the file system output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
>   ... 8 more
> Caused by: java.io.IOException: Could not flush and close the file system 
> output stream to 
> hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  in order to obtain the stream state handle
>   at 
> org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
>   at 
> org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
>   at 
> org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
>   at 
> org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   ... 1 more
> Caused by: 
> org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
>  No lease 
> flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
>  (inode 812148671): File does not exist. [Lease.  Holder: 
> DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
>   at 
> org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659)
>   at 
> 

[jira] [Created] (FLINK-8098) LeaseExpiredException when using FsStateBackend for checkpointing due to multiple mappers tries to access the same file.

2017-11-17 Thread Shashank Agarwal (JIRA)
Shashank Agarwal created FLINK-8098:
---

 Summary: LeaseExpiredException when using FsStateBackend for 
checkpointing due to multiple mappers tries to access the same file.
 Key: FLINK-8098
 URL: https://issues.apache.org/jira/browse/FLINK-8098
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.3.2
 Environment: Yarn, HDFS 2.7.3, Kafka, scala streaming API, CEP
Reporter: Shashank Agarwal


I am running streaming application with parallelism 6. I have enabled 
checkpointing(1000). But application gets the crash after 1-2 days. After 
analysing logs i found following trace. 

{code}
2017-11-17 11:19:06,696 WARN  
org.apache.flink.streaming.runtime.tasks.StreamTask   - Could not 
properly clean up the async checkpoint runnable.
java.lang.Exception: Could not properly cancel managed keyed state future.
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.close(StreamTask.java:983)
at org.apache.flink.util.IOUtils.closeQuietly(IOUtils.java:262)
at org.apache.flink.util.IOUtils.closeAllQuietly(IOUtils.java:251)
at 
org.apache.flink.util.AbstractCloseableRegistry.close(AbstractCloseableRegistry.java:97)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.cancel(StreamTask.java:355)
at 
org.apache.flink.runtime.taskmanager.Task$TaskCanceler.run(Task.java:1463)
at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.ExecutionException: java.io.IOException: Could 
not flush and close the file system output stream to 
hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
 in order to obtain the stream state handle
at java.util.concurrent.FutureTask.report(FutureTask.java:122)
at java.util.concurrent.FutureTask.get(FutureTask.java:192)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
at 
org.apache.flink.runtime.state.StateUtil.discardStateFuture(StateUtil.java:85)
at 
org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:88)
... 8 more
Caused by: java.io.IOException: Could not flush and close the file system 
output stream to 
hdfs://xyz.com:8020/flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
 in order to obtain the stream state handle
at 
org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory$FsCheckpointStateOutputStream.closeAndGetHandle(FsCheckpointStreamFactory.java:336)
at 
org.apache.flink.runtime.checkpoint.AbstractAsyncSnapshotIOCallable.closeStreamAndGetStateHandle(AbstractAsyncSnapshotIOCallable.java:100)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:351)
at 
org.apache.flink.runtime.state.heap.HeapKeyedStateBackend$1.performOperation(HeapKeyedStateBackend.java:329)
at 
org.apache.flink.runtime.io.async.AbstractAsyncIOCallable.call(AbstractAsyncIOCallable.java:72)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:40)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
at 
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
... 1 more
Caused by: 
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hdfs.server.namenode.LeaseExpiredException):
 No lease 
flink/sux/54944cea1f566ee801656e06cdeeabbc/chk-40191/cf145018-0599-4281-b254-96600a4e4965
 (inode 812148671): File does not exist. [Lease.  Holder: 
DFSClient_NONMAPREDUCE_1721510813_94, pendingcreates: 161]
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.checkLease(FSNamesystem.java:3659)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFileInternal(FSNamesystem.java:3749)
at 
org.apache.hadoop.hdfs.server.namenode.FSNamesystem.completeFile(FSNamesystem.java:3716)
at 
org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.complete(NameNodeRpcServer.java:911)
at 
org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.complete(ClientNamenodeProtocolServerSideTranslatorPB.java:547)
   

[jira] [Commented] (FLINK-8061) Remove trailing asterisk in QueryableStateClient javadocs

2017-11-17 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256846#comment-16256846
 ] 

Kostas Kloudas commented on FLINK-8061:
---

Would you like to merge this [~Zentol]? It would be nice to clean up JIRA a bit.

> Remove trailing asterisk in QueryableStateClient javadocs
> -
>
> Key: FLINK-8061
> URL: https://issues.apache.org/jira/browse/FLINK-8061
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeHint   A {@link TypeHint} used 
> to extract the type of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}
> {code}
> /**
>* Returns a future holding the request result.  *
>* @param jobId JobID of the job the queryable 
> state belongs to.
>* @param queryableStateNameName under which the state is 
> queryable.
>* @param key   The key we are interested 
> in.
>* @param keyTypeInfo   The {@link 
> TypeInformation} of the key.
>* @param stateDescriptor   The {@link 
> StateDescriptor} of the state we want to query.
>* @return Future holding the immutable {@link State} object containing 
> the result.
>*/
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8064) Extend dependency section to list flink-core

2017-11-17 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8064?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256845#comment-16256845
 ] 

Kostas Kloudas commented on FLINK-8064:
---

Actually we tried it, and if  you want to avoid transitively adding a dep to 
{{runtime}} in the client, there is not much that you put in the {{common}} 
package, and it is not worth the complexity. 

In any case, in the future we can introduce further separation of the packages 
if we want, and as long as the {{runtime}} and {{client}} stay there, it will 
make no difference to the user.

> Extend dependency section to list flink-core
> 
>
> Key: FLINK-8064
> URL: https://issues.apache.org/jira/browse/FLINK-8064
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>
> The dependency section of the Queryable State documentation should also list 
> flink-core as it is inherently required when working with the client. (Which 
> has flink-core set to provided)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8090) Improve error message when registering different states under the same name.

2017-11-17 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8090?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256826#comment-16256826
 ] 

Xingcan Cui commented on FLINK-8090:


Hi [~kkl0u], thanks for raising this. I found that a {{State}} is only decided 
by the name in its {{StateDescriptor}}. In other words, if we create two 
descriptors with an identical name and type, it will return the same state 
object (though the wrapper may be different). On the other hand, for two 
descriptors with an identical name but different types, we can detect them with 
a {{ClassCastException}}, which is caused by the type erasure in Java. I'll try 
to refactor the {{DefaultKeyedStateStore}} to provide a better error message 
for the later case.

Best, Xingcan

> Improve error message when registering different states under the same name.
> 
>
> Key: FLINK-8090
> URL: https://issues.apache.org/jira/browse/FLINK-8090
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Xingcan Cui
>
> Currently a {{ProcessFunction}} like this:
> {code}
> final MapStateDescriptor> 
> firstMapStateDescriptor = new MapStateDescriptor<>(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO,
>   source.getType());
> final ListStateDescriptor secondListStateDescriptor = new 
> ListStateDescriptor(
>   "timon-one",
>   BasicTypeInfo.INT_TYPE_INFO);
> new ProcessFunction, Object>() {
>   private static final long serialVersionUID = 
> -805125545438296619L;
>   private transient MapState Tuple2> firstMapState;
> private transient ListState 
> secondListState;
>   @Override
>   public void open(Configuration parameters) 
> throws Exception {
>   super.open(parameters);
>   firstMapState = 
> getRuntimeContext().getMapState(firstMapStateDescriptor);
>   secondListState = 
> getRuntimeContext().getListState(secondListStateDescriptor);
>   }
>   @Override
>   public void processElement(Tuple2 Long> value, Context ctx, Collector out) throws Exception {
>   Tuple2 v = 
> firstMapState.get(value.f0);
>   if (v == null) {
>   v = new Tuple2<>(value.f0, 0L);
>   }
>   firstMapState.put(value.f0, new 
> Tuple2<>(v.f0, v.f1 + value.f1));
>   }
>   }
> {code}
> fails with:
> {code}
> java.lang.RuntimeException: Error while getting state
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:74)
>   at 
> org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getListState(StreamingRuntimeContext.java:127)
>   at 
> org.apache.flink.queryablestate.itcases.AbstractQueryableStateTestBase$2.open(AbstractQueryableStateTestBase.java:327)
>   at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>   at 
> org.apache.flink.streaming.api.operators.KeyedProcessOperator.open(KeyedProcessOperator.java:58)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:381)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
>   at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.ClassCastException: 
> org.apache.flink.runtime.state.heap.HeapMapState cannot be cast to 
> org.apache.flink.api.common.state.ListState
>   at 
> org.apache.flink.runtime.state.DefaultKeyedStateStore.getListState(DefaultKeyedStateStore.java:71)
>   ... 9 more
> {code}
> Which is cryptic, as it does not explain the reason for the problem. The 
> error message should be something along the line of "Duplicate state name".



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7432) Unclosed HighAvailabilityServices instance in QueryableStateClient

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-7432.
-
Resolution: Fixed

This is outdated given the recent changes in Queryable State for Flink 1.4

> Unclosed HighAvailabilityServices instance in QueryableStateClient
> --
>
> Key: FLINK-7432
> URL: https://issues.apache.org/jira/browse/FLINK-7432
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Ted Yu
>Assignee: Kostas Kloudas
>Priority: Minor
>
> {code}
>   public QueryableStateClient(Configuration config) throws Exception {
> this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config, Executors.directExecutor(), 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
> {code}
> The HighAvailabilityServices instance is only used for calling 
> getJobManagerLeaderRetriever().
> The instance should be closed upon leaving QueryableStateClient ctor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7432) Unclosed HighAvailabilityServices instance in QueryableStateClient

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7432?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-7432:
-

Assignee: Kostas Kloudas

> Unclosed HighAvailabilityServices instance in QueryableStateClient
> --
>
> Key: FLINK-7432
> URL: https://issues.apache.org/jira/browse/FLINK-7432
> Project: Flink
>  Issue Type: Bug
>  Components: Queryable State
>Reporter: Ted Yu
>Assignee: Kostas Kloudas
>Priority: Minor
>
> {code}
>   public QueryableStateClient(Configuration config) throws Exception {
> this(config, HighAvailabilityServicesUtils.createHighAvailabilityServices(
> config, Executors.directExecutor(), 
> HighAvailabilityServicesUtils.AddressResolution.TRY_ADDRESS_RESOLUTION));
> {code}
> The HighAvailabilityServices instance is only used for calling 
> getJobManagerLeaderRetriever().
> The instance should be closed upon leaving QueryableStateClient ctor.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-6676) API Migration guide: add QueryableStateClient changes

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-6676?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-6676.
-
Resolution: Fixed
  Assignee: Kostas Kloudas

This is now outdated.

> API Migration guide: add QueryableStateClient changes
> -
>
> Key: FLINK-6676
> URL: https://issues.apache.org/jira/browse/FLINK-6676
> Project: Flink
>  Issue Type: Sub-task
>  Components: Documentation, Queryable State
>Affects Versions: 1.3.0
>Reporter: Nico Kruber
>Assignee: Kostas Kloudas
> Fix For: 1.3.0
>
>
> The migration guide at {{docs/dev/migration.md}} needs to be extended with 
> some notes about the API changes:
> * changes in the constructor
> * more?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-5526) QueryableState: notify upon receiving a query but having queryable state disabled

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5526?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-5526.
-
Resolution: Invalid

With the current changes in Flink 1.4, this issue is outdated.

> QueryableState: notify upon receiving a query but having queryable state 
> disabled
> -
>
> Key: FLINK-5526
> URL: https://issues.apache.org/jira/browse/FLINK-5526
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.2.0
>Reporter: Nico Kruber
>Priority: Minor
>
> When querying state but having it disabled in the config, a warning should be 
> presented to the user that a query was received but the component is 
> disabled. This is in addition to the query itself failing with a rather 
> generic exception that is not pointing to this fact.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8055) Deduplicate logging messages about QS start

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8055?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8055.
-
Resolution: Fixed

Merged with 5e059e968633c4292734ebed209fa1b3c30529a1

> Deduplicate logging messages about QS start
> ---
>
> Key: FLINK-8055
> URL: https://issues.apache.org/jira/browse/FLINK-8055
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> When starting the queryable state servers the following is printed in the 
> logs both the NetworkEnvironment and AbstractServerBase are logging events 
> about it.
> I suggest to remove the logging message from the NetworkEnvironment. 
> Furthermore i think it would be good to use a non-static logger in 
> AbstractServerBase to better differentiate between classes.
> {code}
> 2017-11-13 13:51:32,042 INFO  
> org.apache.flink.queryablestate.network.AbstractServerBase- Started the 
> Queryable State Server @ /127.0.0.1:9067.
> 2017-11-13 13:51:32,042 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Started the 
> Queryable State Data Server @ /127.0.0.1:9067
> 2017-11-13 13:51:32,049 INFO  
> org.apache.flink.queryablestate.network.AbstractServerBase- Started the 
> Queryable State Proxy Server @ /127.0.0.1:9069.2017-11-13 13:51:32,049 INFO  
> org.apache.flink.runtime.io.network.NetworkEnvironment- Started the 
> Queryable State Client Proxy @ /127.0.0.1:9069
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8059) Querying the state of a non-existing jobs should throw JobNotFoundException

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8059?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8059.
-
Resolution: Fixed

Merged with 2fe078f3927595cbc3c5de6635a710494e0f34b4

> Querying the state of a non-existing jobs should throw JobNotFoundException
> ---
>
> Key: FLINK-8059
> URL: https://issues.apache.org/jira/browse/FLINK-8059
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> When querying the state for a non-existing job you currently get a 
> IllegalStateException. Given that this isn't an illegal state we should 
> return a JobNotFoundException instead.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8065) Extend exception message if QS client is already shut down

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8065?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8065.
-
Resolution: Fixed

Merged with 75c14541fdc52d5446b179e8e660b8a4fd90310c

> Extend exception message if QS client is already shut down
> --
>
> Key: FLINK-8065
> URL: https://issues.apache.org/jira/browse/FLINK-8065
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Minor
>
> Calling {{QueryableStateClient#getKvState}} after shutting it down leads to 
> this rather short exception message:
> {code}
> Caused by: java.lang.IllegalStateException: Shut down
>   at 
> org.apache.flink.queryablestate.network.Client.sendRequest(Client.java:136)
>   at 
> org.apache.flink.queryablestate.client.QueryableStateClient.getKvState(QueryableStateClient.java:251)
> {code}
> We can extend this just a little bit.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8062) QueryableStateClient#getKvState(...N Namespace,...) not documented

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8062?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8062.
-
Resolution: Fixed

Merged with ff7e3cf6749a6b6bc898fde871c36661c8629c23

> QueryableStateClient#getKvState(...N Namespace,...) not documented
> --
>
> Key: FLINK-8062
> URL: https://issues.apache.org/jira/browse/FLINK-8062
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{QueryableStateClient}} has this method:
> {code}
> public  CompletableFuture getKvState(
>   final JobID jobId,
>   final String queryableStateName,
>   final K key,
>   final N namespace,
>   final TypeInformation keyTypeInfo,
>   final TypeInformation namespaceTypeInfo,
>   final StateDescriptor stateDescriptor) {
> {code}
> There is no documentation on how to use this method or what namespaces are.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256774#comment-16256774
 ] 

ASF GitHub Bot commented on FLINK-8057:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5028


> KvStateRegistry#registerKvState may randomly fail
> -
>
> Key: FLINK-8057
> URL: https://issues.apache.org/jira/browse/FLINK-8057
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, 
> a new KvStateID is generated, and we then check whether a state for this ID 
> was already registered. Realistically, this is never the case making the 
> check unnecessary, and on the off-chance that it does happen the job will 
> fail for no good reason.
> {code}
> KvStateID kvStateId = new KvStateID();
> if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
>   KvStateRegistryListener listener = this.listener.get();
>   if (listener != null) {
>   listener.notifyKvStateRegistered(
>   jobId,
>   jobVertexId,
>   keyGroupRange,
>   registrationName,
>   kvStateId);
>   }
>   return kvStateId;
> } else {
>   throw new IllegalStateException(kvStateId + " is already registered.");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8057.
-
Resolution: Fixed

Merged at a4d86975967942054d1bd466641e9c835fb014ac

> KvStateRegistry#registerKvState may randomly fail
> -
>
> Key: FLINK-8057
> URL: https://issues.apache.org/jira/browse/FLINK-8057
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, 
> a new KvStateID is generated, and we then check whether a state for this ID 
> was already registered. Realistically, this is never the case making the 
> check unnecessary, and on the off-chance that it does happen the job will 
> fail for no good reason.
> {code}
> KvStateID kvStateId = new KvStateID();
> if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
>   KvStateRegistryListener listener = this.listener.get();
>   if (listener != null) {
>   listener.notifyKvStateRegistered(
>   jobId,
>   jobVertexId,
>   keyGroupRange,
>   registrationName,
>   kvStateId);
>   }
>   return kvStateId;
> } else {
>   throw new IllegalStateException(kvStateId + " is already registered.");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5028: [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][F...

2017-11-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5028


---


[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256747#comment-16256747
 ] 

ASF GitHub Bot commented on FLINK-8057:
---

GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5028

[FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][FLINK-8062]

This PR simply fixes multiple minor issues, like error messages.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-small-stuff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5028.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5028


commit ff7e3cf6749a6b6bc898fde871c36661c8629c23
Author: kkloudas 
Date:   2017-11-15T14:32:42Z

[FLINK-8062][QS] Make getKvState() with namespace private.

commit 75c14541fdc52d5446b179e8e660b8a4fd90310c
Author: kkloudas 
Date:   2017-11-15T14:38:36Z

[FLINK-8065][QS] Improve error message when client already shut down.

commit 5e059e968633c4292734ebed209fa1b3c30529a1
Author: kkloudas 
Date:   2017-11-16T16:02:16Z

[FLINK-8055][QS] Deduplicate logging messages about QS start.

commit 2fe078f3927595cbc3c5de6635a710494e0f34b4
Author: kkloudas 
Date:   2017-11-16T16:45:49Z

[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries 
with unknown jobIds

commit a4d86975967942054d1bd466641e9c835fb014ac
Author: kkloudas 
Date:   2017-11-17T08:26:10Z

[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()




> KvStateRegistry#registerKvState may randomly fail
> -
>
> Key: FLINK-8057
> URL: https://issues.apache.org/jira/browse/FLINK-8057
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, 
> a new KvStateID is generated, and we then check whether a state for this ID 
> was already registered. Realistically, this is never the case making the 
> check unnecessary, and on the off-chance that it does happen the job will 
> fail for no good reason.
> {code}
> KvStateID kvStateId = new KvStateID();
> if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
>   KvStateRegistryListener listener = this.listener.get();
>   if (listener != null) {
>   listener.notifyKvStateRegistered(
>   jobId,
>   jobVertexId,
>   keyGroupRange,
>   registrationName,
>   kvStateId);
>   }
>   return kvStateId;
> } else {
>   throw new IllegalStateException(kvStateId + " is already registered.");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5028: [FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][F...

2017-11-17 Thread kl0u
GitHub user kl0u opened a pull request:

https://github.com/apache/flink/pull/5028

[FLINK-8057][FLINK-8059][FLINK-8055][FLINK-8065][FLINK-8062]

This PR simply fixes multiple minor issues, like error messages.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/kl0u/flink qs-small-stuff

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5028.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5028


commit ff7e3cf6749a6b6bc898fde871c36661c8629c23
Author: kkloudas 
Date:   2017-11-15T14:32:42Z

[FLINK-8062][QS] Make getKvState() with namespace private.

commit 75c14541fdc52d5446b179e8e660b8a4fd90310c
Author: kkloudas 
Date:   2017-11-15T14:38:36Z

[FLINK-8065][QS] Improve error message when client already shut down.

commit 5e059e968633c4292734ebed209fa1b3c30529a1
Author: kkloudas 
Date:   2017-11-16T16:02:16Z

[FLINK-8055][QS] Deduplicate logging messages about QS start.

commit 2fe078f3927595cbc3c5de6635a710494e0f34b4
Author: kkloudas 
Date:   2017-11-16T16:45:49Z

[FLINK-8059][QS] QS client throws FlinkJobNotFoundException for queries 
with unknown jobIds

commit a4d86975967942054d1bd466641e9c835fb014ac
Author: kkloudas 
Date:   2017-11-17T08:26:10Z

[FLINK-8057][QS] Change error message in KvStateRegistry.registerKvState()




---


[jira] [Commented] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256732#comment-16256732
 ] 

ASF GitHub Bot commented on FLINK-8063:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5021


> Client blocks indefinitely when querying a non-existing state
> -
>
> Key: FLINK-8063
> URL: https://issues.apache.org/jira/browse/FLINK-8063
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-8063) Client blocks indefinitely when querying a non-existing state

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas closed FLINK-8063.
-
Resolution: Fixed

merged with a0838de79ff73b0322f3ce255df54f5f33b2bf3b

> Client blocks indefinitely when querying a non-existing state
> -
>
> Key: FLINK-8063
> URL: https://issues.apache.org/jira/browse/FLINK-8063
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>Priority: Critical
> Fix For: 1.4.0
>
>
> When querying for a non-existing state (as in, no state was registered under 
> queryableStateName) the client blocks indefinitely.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5021: [FLINK-8063][QS] QS client does not retry when an ...

2017-11-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5021


---


[jira] [Commented] (FLINK-8096) Fix time material issue when write to TableSink

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256711#comment-16256711
 ] 

ASF GitHub Bot commented on FLINK-8096:
---

Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151637017
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

Yes, you are right. Thanks for the explanation @dianfu.


> Fix time material issue when write to TableSink
> ---
>
> Key: FLINK-8096
> URL: https://issues.apache.org/jira/browse/FLINK-8096
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> We will get the following exception for unit test 
> {{TimeAttributesITCase.testCalcMaterialization3}}.
> {noformat}
> org.apache.flink.table.api.TableException: Found more than one rowtime field: 
> [rowtime, _c1, _c2] in the table that should be converted to a DataStream.
> Please select the rowtime field that should be used as event-time timestamp 
> for the DataStream by casting all other fields to TIMESTAMP.
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:783)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:263)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>   at 
> org.apache.flink.table.runtime.stream.TimeAttributesITCase.testCalcMaterialization3(TimeAttributesITCase.scala:196)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-17 Thread xccui
Github user xccui commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151637017
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

Yes, you are right. Thanks for the explanation @dianfu.


---


[jira] [Commented] (FLINK-4228) YARN artifact upload does not work with S3AFileSystem

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-4228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256656#comment-16256656
 ] 

ASF GitHub Bot commented on FLINK-4228:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
I'm actually with Chesnay on the part of using exclusions (and letting AWS 
pull them in with the right version) rather than adding explicit dependencies 
because it will be easier to debug than an error due to a version conflict.
We can immediately spot those `ClassNotFoundException` instances in the 
failing unit tests (recall that those dependencies are all in `test` scope!) 
while as a version conflict may show up more subtle and be less easy to spot.


> YARN artifact upload does not work with S3AFileSystem
> -
>
> Key: FLINK-4228
> URL: https://issues.apache.org/jira/browse/FLINK-4228
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Ufuk Celebi
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The issue now is exclusive to running on YARN with s3a:// as your configured 
> FileSystem. If so, the Flink session will fail on staging itself because it 
> tries to copy the flink/lib directory to S3 and the S3aFileSystem does not 
> support recursive copy.
> h2. Old Issue
> Using the {{RocksDBStateBackend}} with semi-async snapshots (current default) 
> leads to an Exception when uploading the snapshot to S3 when using the 
> {{S3AFileSystem}}.
> {code}
> AsynchronousException{com.amazonaws.AmazonClientException: Unable to 
> calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointThread.run(StreamTask.java:870)
> Caused by: com.amazonaws.AmazonClientException: Unable to calculate MD5 hash: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1298)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.uploadInOneChunk(UploadCallable.java:108)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadCallable.call(UploadCallable.java:100)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.upload(UploadMonitor.java:192)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:150)
>   at 
> com.amazonaws.services.s3.transfer.internal.UploadMonitor.call(UploadMonitor.java:50)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.io.FileNotFoundException: 
> /var/folders/_c/5tc5q5q55qjcjtqwlwvwd1m0gn/T/flink-io-5640e9f1-3ea4-4a0f-b4d9-3ce9fbd98d8a/7c6e745df2dddc6eb70def1240779e44/StreamFlatMap_3_0/dummy_state/47daaf2a-150c-4208-aa4b-409927e9e5b7/local-chk-2886
>  (Is a directory)
>   at java.io.FileInputStream.open0(Native Method)
>   at java.io.FileInputStream.open(FileInputStream.java:195)
>   at java.io.FileInputStream.(FileInputStream.java:138)
>   at 
> com.amazonaws.services.s3.AmazonS3Client.putObject(AmazonS3Client.java:1294)
>   ... 9 more
> {code}
> Running with S3NFileSystem, the error does not occur. The problem might be 
> due to {{HDFSCopyToLocal}} assuming that sub-folders are going to be created 
> automatically. We might need to manually create folders and copy only actual 
> files for {{S3AFileSystem}}. More investigation is required.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4939: [FLINK-4228][yarn/s3a] fix yarn resource upload s3a defau...

2017-11-17 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/4939
  
I'm actually with Chesnay on the part of using exclusions (and letting AWS 
pull them in with the right version) rather than adding explicit dependencies 
because it will be easier to debug than an error due to a version conflict.
We can immediately spot those `ClassNotFoundException` instances in the 
failing unit tests (recall that those dependencies are all in `test` scope!) 
while as a version conflict may show up more subtle and be less easy to spot.


---


[GitHub] flink pull request #5013: [FLINK-7973] disable JNI bridge for relocated hado...

2017-11-17 Thread NicoK
Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/5013


---


[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256646#comment-16256646
 ] 

ASF GitHub Bot commented on FLINK-7973:
---

Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5013
  
Fixed on master in
0e5fb0b78cd0a3ccb144071a47579eb6c3d0570a
e9e7c3372189db7e933ff59114b9ec6245838eda

Fixed on release-1.4 in
25a28ab32609c45fb8c40f717148e32fb453d2fc
9f68212603e3601e2f7a67ff93be9b15844c14da


> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7973) Fix service shading relocation for S3 file systems

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256647#comment-16256647
 ] 

ASF GitHub Bot commented on FLINK-7973:
---

Github user NicoK closed the pull request at:

https://github.com/apache/flink/pull/5013


> Fix service shading relocation for S3 file systems
> --
>
> Key: FLINK-7973
> URL: https://issues.apache.org/jira/browse/FLINK-7973
> Project: Flink
>  Issue Type: Bug
>Reporter: Stephan Ewen
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.4.0
>
>
> The shade plugin relocates services incorrectly currently, applying 
> relocation patterns multiple times.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #5013: [FLINK-7973] disable JNI bridge for relocated hadoop clas...

2017-11-17 Thread NicoK
Github user NicoK commented on the issue:

https://github.com/apache/flink/pull/5013
  
Fixed on master in
0e5fb0b78cd0a3ccb144071a47579eb6c3d0570a
e9e7c3372189db7e933ff59114b9ec6245838eda

Fixed on release-1.4 in
25a28ab32609c45fb8c40f717148e32fb453d2fc
9f68212603e3601e2f7a67ff93be9b15844c14da


---


[jira] [Commented] (FLINK-8096) Fix time material issue when write to TableSink

2017-11-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8096?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256636#comment-16256636
 ] 

ASF GitHub Bot commented on FLINK-8096:
---

Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151625442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

The `resultType` generated from the optimized plan contains time indicator 
information. In `StreamTableEnvironment.translate`, it needs this information 
to transform the time indicator column to `TimeStamp` type. For the type 
consistent issue, it will be validated during converting CRow to output type:

https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941


> Fix time material issue when write to TableSink
> ---
>
> Key: FLINK-8096
> URL: https://issues.apache.org/jira/browse/FLINK-8096
> Project: Flink
>  Issue Type: Bug
>  Components: Table API & SQL
>Reporter: Dian Fu
>Assignee: Dian Fu
> Fix For: 1.5.0
>
>
> We will get the following exception for unit test 
> {{TimeAttributesITCase.testCalcMaterialization3}}.
> {noformat}
> org.apache.flink.table.api.TableException: Found more than one rowtime field: 
> [rowtime, _c1, _c2] in the table that should be converted to a DataStream.
> Please select the rowtime field that should be used as event-time timestamp 
> for the DataStream by casting all other fields to TIMESTAMP.
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:783)
>   at 
> org.apache.flink.table.api.StreamTableEnvironment.writeToSink(StreamTableEnvironment.scala:263)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:862)
>   at org.apache.flink.table.api.Table.writeToSink(table.scala:830)
>   at 
> org.apache.flink.table.runtime.stream.TimeAttributesITCase.testCalcMaterialization3(TimeAttributesITCase.scala:196)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at org.junit.rules.TestWatcher$1.evaluate(TestWatcher.java:55)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
>   at org.junit.rules.RunRules.evaluate(RunRules.java:20)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
>   at 
> com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
>   at 
> com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:51)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:237)
>   at 
> com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
> {noformat}



[GitHub] flink pull request #5025: [FLINK-8096] [table] Fix time material issue when ...

2017-11-17 Thread dianfu
Github user dianfu commented on a diff in the pull request:

https://github.com/apache/flink/pull/5025#discussion_r151625442
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala
 ---
@@ -234,11 +234,12 @@ abstract class StreamTableEnvironment(
 "UpsertStreamTableSink requires that Table has a full primary 
keys if it is updated.")
 }
 val outputType = sink.getOutputType
+val resultType = getResultType(table.getRelNode, optimizedPlan)
--- End diff --

The `resultType` generated from the optimized plan contains time indicator 
information. In `StreamTableEnvironment.translate`, it needs this information 
to transform the time indicator column to `TimeStamp` type. For the type 
consistent issue, it will be validated during converting CRow to output type:

https://github.com/apache/flink/blob/81dc260dc653085b9dbf098e8fd70a72d2d0828e/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/TableEnvironment.scala#L941


---


[jira] [Commented] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail

2017-11-17 Thread Kostas Kloudas (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16256629#comment-16256629
 ] 

Kostas Kloudas commented on FLINK-8057:
---

I agree that this check is never failing but if it ever fails, then there is 
something wrong. 
So I would recommend to keep the check, as I consider a good practice to be 
defensive and have a check for every possible execution path (as long as it is 
not in the critical path), and just change the exception message to sth that 
indicates that execution took an unexpected path. 

What about sth along the lines: kvStateId + " appears registered although it 
should not." ?

> KvStateRegistry#registerKvState may randomly fail
> -
>
> Key: FLINK-8057
> URL: https://issues.apache.org/jira/browse/FLINK-8057
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, 
> a new KvStateID is generated, and we then check whether a state for this ID 
> was already registered. Realistically, this is never the case making the 
> check unnecessary, and on the off-chance that it does happen the job will 
> fail for no good reason.
> {code}
> KvStateID kvStateId = new KvStateID();
> if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
>   KvStateRegistryListener listener = this.listener.get();
>   if (listener != null) {
>   listener.notifyKvStateRegistered(
>   jobId,
>   jobVertexId,
>   keyGroupRange,
>   registrationName,
>   kvStateId);
>   }
>   return kvStateId;
> } else {
>   throw new IllegalStateException(kvStateId + " is already registered.");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-8057) KvStateRegistry#registerKvState may randomly fail

2017-11-17 Thread Kostas Kloudas (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8057?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kostas Kloudas reassigned FLINK-8057:
-

Assignee: Kostas Kloudas

> KvStateRegistry#registerKvState may randomly fail
> -
>
> Key: FLINK-8057
> URL: https://issues.apache.org/jira/browse/FLINK-8057
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Kostas Kloudas
>
> The {{KvStateRegistry#registerKvState}} method is a bit weird. On each call, 
> a new KvStateID is generated, and we then check whether a state for this ID 
> was already registered. Realistically, this is never the case making the 
> check unnecessary, and on the off-chance that it does happen the job will 
> fail for no good reason.
> {code}
> KvStateID kvStateId = new KvStateID();
> if (registeredKvStates.putIfAbsent(kvStateId, kvState) == null) {
>   KvStateRegistryListener listener = this.listener.get();
>   if (listener != null) {
>   listener.notifyKvStateRegistered(
>   jobId,
>   jobVertexId,
>   keyGroupRange,
>   registrationName,
>   kvStateId);
>   }
>   return kvStateId;
> } else {
>   throw new IllegalStateException(kvStateId + " is already registered.");
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)