[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407454#comment-16407454
 ] 

ASF GitHub Bot commented on FLINK-9034:
---

Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r175983599
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

nit:Type information will not dropped during serialization now, it dropped 
in `initializeSerializerUnlessSet()`.


> State Descriptors drop TypeInformation on serialization
> ---
>
> Key: FLINK-9034
> URL: https://issues.apache.org/jira/browse/FLINK-9034
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction  {
> private final ValueStateDescriptor descr = new 
> ValueStateDescriptor<>("state name", MyType.class);
> private ValueState state;
> @Override
> public void open() {
> state = getRuntimeContext().getValueState(descr);
> }
> }
> {code}
> The problem is that the state descriptor drops the type information and 
> creates a serializer before serialization as part of shipping the function in 
> the cluster. To do that, it initializes the serializer with an empty 
> execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information 
> before shipping was necessary, because the type info was not serializable. It 
> now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-20 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5732#discussion_r175983599
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/state/StateDescriptor.java 
---
@@ -77,18 +80,22 @@
 
/** The serializer for the type. May be eagerly initialized in the 
constructor,
 * or lazily once the type is serialized or an ExecutionConfig is 
provided. */
+   @Nullable
protected TypeSerializer serializer;
 
+   /** The type information describing the value type. Only used to lazily 
create the serializer
+* and dropped during serialization */
+   @Nullable
--- End diff --

nit:Type information will not dropped during serialization now, it dropped 
in `initializeSerializerUnlessSet()`.


---


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407407#comment-16407407
 ] 

ASF GitHub Bot commented on FLINK-6924:
---

Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@walterddr What else should I do for this PR ?


> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-20 Thread buptljy
Github user buptljy commented on the issue:

https://github.com/apache/flink/pull/5638
  
@walterddr What else should I do for this PR ?


---


[jira] [Commented] (FLINK-9026) Unregister finished tasks from TaskManagerMetricGroup and close it

2018-03-20 Thread Sihua Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9026?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407403#comment-16407403
 ] 

Sihua Zhou commented on FLINK-9026:
---

Hi [~till.rohrmann] I'm a bit confused about "We should unregister Tasks from 
the TaskManagerMetricGroup when they have reached a final state", I found the 
{{Task}} will close the {{TaskMetricGroup}} they held when they reach a final 
state, the {{TaskMetricGroup.close()}} will call 
{{parent.removeTaskMetricGroup(executionId);}}, so the {{TaskMetricGroup}} will 
be unregistered from the {{TaskManagerMetricGroup}} currently.

> Unregister finished tasks from TaskManagerMetricGroup and close it
> --
>
> Key: FLINK-9026
> URL: https://issues.apache.org/jira/browse/FLINK-9026
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Affects Versions: 1.5.0, 1.6.0
>Reporter: Till Rohrmann
>Assignee: Sihua Zhou
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> We should unregister {{Tasks}} from the {{TaskManagerMetricGroup}} when they 
> have reached a final state. Moreover, we should close the 
> {{TaskManagerMetricGroup}} either in the {{TaskExecutor#postStop}} method or 
> let the caller do this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9022) fix resource close in `StreamTaskStateInitializerImpl.streamOperatorStateContext()`

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9022?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407387#comment-16407387
 ] 

ASF GitHub Bot commented on FLINK-9022:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5716
  
Strange...the Travis failed and the failed is related to 
`YarnClusterDescriptorTest`, but it seems not related to this PR...


> fix resource close in 
> `StreamTaskStateInitializerImpl.streamOperatorStateContext()`
> ---
>
> Key: FLINK-9022
> URL: https://issues.apache.org/jira/browse/FLINK-9022
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We have the following code in 
> {{StreamTaskStateInitializerImpl.streamOperatorStateContext()}} which is 
> incorrect:
> {code}
> } catch (Exception ex) {
>   // cleanup if something went wrong before results got published.
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(keyedStatedBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(operatorStateBackend)) {
>   IOUtils.closeQuietly(keyedStatedBackend); // this should close 
> operatorStateBackend
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawKeyedStateInputs)) {
>   IOUtils.closeQuietly(rawKeyedStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   if 
> (streamTaskCloseableRegistry.unregisterCloseable(rawOperatorStateInputs)) {
>   IOUtils.closeQuietly(rawOperatorStateInputs);
>   }
>   throw new Exception("Exception while creating 
> StreamOperatorStateContext.", ex);
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5716: [FLINK-9022][state] fix resource release in StreamTaskSta...

2018-03-20 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5716
  
Strange...the Travis failed and the failed is related to 
`YarnClusterDescriptorTest`, but it seems not related to this PR...


---


[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407315#comment-16407315
 ] 

ASF GitHub Bot commented on FLINK-9011:
---

Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r175969652
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

@yew1eb , In a way, I agree with you. If there is just one 
`log.debug('xxx')` and no string concatination, the `if (log.isDebugEnabled())` 
is not necessary. But this case, there are four `log.debug()` call , and each 
of them would do the same judgement inside the `debug` method. So wrapping a 
outer judgement is performance reason.


> YarnResourceManager spamming log file at INFO level
> ---
>
> Key: FLINK-9011
> URL: https://issues.apache.org/jira/browse/FLINK-9011
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For every requested resource, the {{YarnResourceManager}} spams the log with 
> log-level INFO and the following messages:
> {code}
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
> 2018-03-16 03:41:20,190 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> {code}



--
This message was sent by Atlassian JIRA

[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...

2018-03-20 Thread yanghua
Github user yanghua commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r175969652
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

@yew1eb , In a way, I agree with you. If there is just one 
`log.debug('xxx')` and no string concatination, the `if (log.isDebugEnabled())` 
is not necessary. But this case, there are four `log.debug()` call , and each 
of them would do the same judgement inside the `debug` method. So wrapping a 
outer judgement is performance reason.


---


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407123#comment-16407123
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
Thanks for the feedback everybody!
Will merge the PR tomorrow.


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV_SAMP, ...

2018-03-20 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5706
  
Thanks for the feedback everybody!
Will merge the PR tomorrow.


---


[jira] [Commented] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16407020#comment-16407020
 ] 

ASF GitHub Bot commented on FLINK-9034:
---

GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5732

[FLINK-9034] [FLINK-9035] [core] Fix state descriptors

## What is the purpose of the change

Fixes two issue with the `StateDescriptors` that are used to obtain state 
access in transformation functions: 

### Broken Equals and hashCode

`equals()` and `hashCode()` depends on fields that are not always set and 
that may change during the life of a state descriptor. That is especially 
problematic, because the state descriptors are keys in a map, and if the 
meaning of `equals()` and `hashCode()` changes after insertion, the objects 
become keys that cannot be references / matched.

This pull request changes `equals()` and `hashCode()` to only take state 
name and descriptor type (by class) into account for hashCode and equality, 
which are always constant and not changing as part of serializer initialization.

**Illustration of the problem:**

The following code fails with a `NullPointerException`, because the 
`hashCode()` method tries to access the serializer field, which may be 
uninitialized at that point.

```java
ValueStateDescriptor descr = new ValueStateDescriptor<>("name", 
String.class);
descr.hashCode(); // exception
```

The equals() method is equally broken (no pun intended):
```java
ValueStateDescriptor a = new ValueStateDescriptor<>("name", 
String.class);
ValueStateDescriptor b = new ValueStateDescriptor<>("name", 
String.class);

a.equals(b) // exception
b.equals(a) // exception
a.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // false
b.equals(a) // exception
b.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // true
b.equals(a) // true
```

### Type Information dropped prematurely

The following code is currently problematic:
```java
public class MyFunction extends RichMapFunction  {

private final ValueStateDescriptor descr = new 
ValueStateDescriptor<>("state name", MyType.class);

private ValueState state;

@Override
public void open(Configuration cfg) {
state = getRuntimeContext().getValueState(descr);
}
}
```

The problem is that the state descriptor drops the type information and 
creates a serializer before serialization as part of shipping the function in 
the cluster. To do that, it initializes the serializer with an empty execution 
config, making serialization inconsistent.

This is mainly an artifact from the days when dropping the type information 
before shipping was necessary, because the type info was not serializable. It 
now is, and we can fix that bug.

## Verifying this change

**This change is sensitive, because it touches the structures that all 
users use to obtain access to persistent state.**

This change adds a series of Unit tests to validate the fixed behavior.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API 
compatibility.**
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): 
**Touches the structures that give access to checkpointed state.**
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
fix_state_descriptors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5732


commit 8ff1284d28a056b91d91607584fab2a55fbcc86c
Author: Stephan Ewen 
Date:   2018-03-20T14:15:08Z

[hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

commit dc0df85e064ee45bcb0f83d21b00a1abc9359723
Author: Stephan Ewen 
Date:   

[GitHub] flink pull request #5732: [FLINK-9034] [FLINK-9035] [core] Fix state descrip...

2018-03-20 Thread StephanEwen
GitHub user StephanEwen opened a pull request:

https://github.com/apache/flink/pull/5732

[FLINK-9034] [FLINK-9035] [core] Fix state descriptors

## What is the purpose of the change

Fixes two issue with the `StateDescriptors` that are used to obtain state 
access in transformation functions: 

### Broken Equals and hashCode

`equals()` and `hashCode()` depends on fields that are not always set and 
that may change during the life of a state descriptor. That is especially 
problematic, because the state descriptors are keys in a map, and if the 
meaning of `equals()` and `hashCode()` changes after insertion, the objects 
become keys that cannot be references / matched.

This pull request changes `equals()` and `hashCode()` to only take state 
name and descriptor type (by class) into account for hashCode and equality, 
which are always constant and not changing as part of serializer initialization.

**Illustration of the problem:**

The following code fails with a `NullPointerException`, because the 
`hashCode()` method tries to access the serializer field, which may be 
uninitialized at that point.

```java
ValueStateDescriptor descr = new ValueStateDescriptor<>("name", 
String.class);
descr.hashCode(); // exception
```

The equals() method is equally broken (no pun intended):
```java
ValueStateDescriptor a = new ValueStateDescriptor<>("name", 
String.class);
ValueStateDescriptor b = new ValueStateDescriptor<>("name", 
String.class);

a.equals(b) // exception
b.equals(a) // exception
a.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // false
b.equals(a) // exception
b.initializeSerializerUnlessSet(new ExecutionConfig());
a.equals(b) // true
b.equals(a) // true
```

### Type Information dropped prematurely

The following code is currently problematic:
```java
public class MyFunction extends RichMapFunction  {

private final ValueStateDescriptor descr = new 
ValueStateDescriptor<>("state name", MyType.class);

private ValueState state;

@Override
public void open(Configuration cfg) {
state = getRuntimeContext().getValueState(descr);
}
}
```

The problem is that the state descriptor drops the type information and 
creates a serializer before serialization as part of shipping the function in 
the cluster. To do that, it initializes the serializer with an empty execution 
config, making serialization inconsistent.

This is mainly an artifact from the days when dropping the type information 
before shipping was necessary, because the type info was not serializable. It 
now is, and we can fix that bug.

## Verifying this change

**This change is sensitive, because it touches the structures that all 
users use to obtain access to persistent state.**

This change adds a series of Unit tests to validate the fixed behavior.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no)**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no) - **All changes should preserve full API 
compatibility.**
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know): 
**Touches the structures that give access to checkpointed state.**
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/StephanEwen/incubator-flink 
fix_state_descriptors

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5732.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5732


commit 8ff1284d28a056b91d91607584fab2a55fbcc86c
Author: Stephan Ewen 
Date:   2018-03-20T14:15:08Z

[hotfix] [core] Fix checkstyle in 'org.apache.flink.api.common.state'

commit dc0df85e064ee45bcb0f83d21b00a1abc9359723
Author: Stephan Ewen 
Date:   2018-03-20T14:29:12Z

[hotfix] [core] Add missing serialVersionUID to MapStateDescriptor

commit c62e84414ff4715399bce35ac74fb1d94256c3ed
Author: Stephan Ewen 
Date:   2018-03-20T14:43:33Z

[hotfix] [core] Add @FunctionalInterface to 

[jira] [Commented] (FLINK-8402) HadoopS3FileSystemITCase.testDirectoryListing fails on Travis

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406975#comment-16406975
 ] 

ASF GitHub Bot commented on FLINK-8402:
---

Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
Don't we control the calls on `FileSystem` that we make as part of the YARN 
upload? Can't we simply avoid all operations that are not strongly consistent? 
For example if we strictly only use `create()` and `open()` we should be 
consistent.


> HadoopS3FileSystemITCase.testDirectoryListing fails on Travis
> -
>
> Key: FLINK-8402
> URL: https://issues.apache.org/jira/browse/FLINK-8402
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Nico Kruber
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.3
>
>
> The test {{HadoopS3FileSystemITCase.testDirectoryListing}} fails on Travis.
> https://travis-ci.org/tillrohrmann/flink/jobs/327021175



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5624: [FLINK-8402][s3][tests] fix hadoop/presto S3 IT cases for...

2018-03-20 Thread StephanEwen
Github user StephanEwen commented on the issue:

https://github.com/apache/flink/pull/5624
  
Don't we control the calls on `FileSystem` that we make as part of the YARN 
upload? Can't we simply avoid all operations that are not strongly consistent? 
For example if we strictly only use `create()` and `open()` we should be 
consistent.


---


[jira] [Created] (FLINK-9036) Add default value via suppliers

2018-03-20 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9036:
---

 Summary: Add default value via suppliers
 Key: FLINK-9036
 URL: https://issues.apache.org/jira/browse/FLINK-9036
 Project: Flink
  Issue Type: Improvement
  Components: State Backends, Checkpointing
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.6.0


Earlier versions had a default value in {{ValueState}}. We dropped that, 
because the value would have to be duplicated on each access, to be safe 
against side effects when using mutable types.

For convenience, we should re-add the feature, but using a supplier/factory 
function to create the default value on access.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9035) State Descriptors have broken hashCode() and equals()

2018-03-20 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9035:
---

 Summary: State Descriptors have broken hashCode() and equals()
 Key: FLINK-9035
 URL: https://issues.apache.org/jira/browse/FLINK-9035
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2, 1.5.0
Reporter: Stephan Ewen
Assignee: Stephan Ewen
 Fix For: 1.6.0


The following code fails with a {{NullPointerException}}:
{code}
ValueStateDescriptor descr = new ValueStateDescriptor<>("name", 
String.class);
descr.hashCode();
{code}
The {{hashCode()}} function tries to access the {{serializer}} field, which may 
be uninitialized at that point.

The {{equals()}} method is equally broken (no pun intended):

{code}
ValueStateDescriptor a = new ValueStateDescriptor<>("name", 
String.class);
ValueStateDescriptor b = new ValueStateDescriptor<>("name", 
String.class);

a.equals(b) // exception
b.equals(a) // exception

a.initializeSerializerUnlessSet(new ExecutionConfig());

a.equals(b) // false
b.equals(a) // exception

b.initializeSerializerUnlessSet(new ExecutionConfig());

a.equals(b) // true
b.equals(a) // true
{code}





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-20 Thread Stephan Ewen (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9034?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen reassigned FLINK-9034:
---

Assignee: Stephan Ewen

> State Descriptors drop TypeInformation on serialization
> ---
>
> Key: FLINK-9034
> URL: https://issues.apache.org/jira/browse/FLINK-9034
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Major
> Fix For: 1.6.0
>
>
> The following code currently causes problems
> {code}
> public class MyFunction extends RichMapFunction  {
> private final ValueStateDescriptor descr = new 
> ValueStateDescriptor<>("state name", MyType.class);
> private ValueState state;
> @Override
> public void open() {
> state = getRuntimeContext().getValueState(descr);
> }
> }
> {code}
> The problem is that the state descriptor drops the type information and 
> creates a serializer before serialization as part of shipping the function in 
> the cluster. To do that, it initializes the serializer with an empty 
> execution config, making serialization inconsistent.
> This is mainly an artifact from the days when dropping the type information 
> before shipping was necessary, because the type info was not serializable. It 
> now is, and we can fix that bug.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9034) State Descriptors drop TypeInformation on serialization

2018-03-20 Thread Stephan Ewen (JIRA)
Stephan Ewen created FLINK-9034:
---

 Summary: State Descriptors drop TypeInformation on serialization
 Key: FLINK-9034
 URL: https://issues.apache.org/jira/browse/FLINK-9034
 Project: Flink
  Issue Type: Bug
  Components: State Backends, Checkpointing
Affects Versions: 1.4.2, 1.5.0
Reporter: Stephan Ewen
 Fix For: 1.6.0


The following code currently causes problems

{code}
public class MyFunction extends RichMapFunction  {

private final ValueStateDescriptor descr = new 
ValueStateDescriptor<>("state name", MyType.class);

private ValueState state;

@Override
public void open() {
state = getRuntimeContext().getValueState(descr);
}
}
{code}

The problem is that the state descriptor drops the type information and creates 
a serializer before serialization as part of shipping the function in the 
cluster. To do that, it initializes the serializer with an empty execution 
config, making serialization inconsistent.

This is mainly an artifact from the days when dropping the type information 
before shipping was necessary, because the type info was not serializable. It 
now is, and we can fix that bug.






--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8715) RocksDB does not propagate reconfiguration of serializer to the states

2018-03-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406943#comment-16406943
 ] 

Stephan Ewen commented on FLINK-8715:
-

[~aljoscha] I think you are right, the problem is potentially deeper.

All parts of the code that obtain a serializer from a state descriptor need to 
obey reconfiguration based on the serializer config snapshot. At least all 
parts that touch data serialized by earlier versions. That includes queryable 
state!

I think the best way to fix this (long run) would be to simply never have to 
use the old serializer again. If there is a change between the serializers, 
convert on restore, then the new serializer is safe to use. Everything else 
seems like a deep rabbit hole.

The only worthwhile exception to this is may be Kryo, when the tag-to-class 
mapping changes. Then we really want the new serializer to re-configure and not 
convert the entire data set.

If we really want to handle that, then we would indeed have to either
  - reconfigure the state descriptors
  - or never/nowhere use the state descriptors internally (for default values, 
etc.) but always use serializers/suppliers/etc obtained from the state 
descriptors.


> RocksDB does not propagate reconfiguration of serializer to the states
> --
>
> Key: FLINK-8715
> URL: https://issues.apache.org/jira/browse/FLINK-8715
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.2
>Reporter: Arvid Heise
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Any changes to the serializer done in #ensureCompability are lost during the 
> state creation.
> In particular, 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBValueState.java#L68]
>  always uses a fresh copy of the StateDescriptor.
> An easy fix is to pass the reconfigured serializer as an additional parameter 
> in 
> [https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L1681]
>  , which can be retrieved through the side-output of getColumnFamily
> {code:java}
> kvStateInformation.get(stateDesc.getName()).f1.getStateSerializer()
> {code}
> I encountered it in 1.3.2 but the code in the master seems unchanged (hence 
> the pointer into master). I encountered it in ValueState, but I suspect the 
> same issue can be observed for all kinds of RocksDB states.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8856) Move all interrupt() calls to TaskCanceler

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406938#comment-16406938
 ] 

ASF GitHub Bot commented on FLINK-8856:
---

Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5658


> Move all interrupt() calls to TaskCanceler
> --
>
> Key: FLINK-8856
> URL: https://issues.apache.org/jira/browse/FLINK-8856
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.4.0, 1.4.1, 1.4.2
>Reporter: Stephan Ewen
>Assignee: Stephan Ewen
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> We need this to work around the following JVM bug: 
> https://bugs.java.com/bugdatabase/view_bug.do?bug_id=8138622
> To circumvent this problem, the {{TaskCancelerWatchDog}} must not call 
> {{interrupt()}} at all, but only join on the executing thread (with timeout) 
> and cause a hard exit once cancellation takes to long.
> A user affected by this problem reported this in FLINK-8834
> Personal note: The Thread.join(...) method unfortunately is not 100% reliable 
> as well, because it uses {{System.currentTimeMillis()}} rather than 
> {{System.nanoTime()}}. Because of that, sleeps can take overly long when the 
> clock is adjusted. I wonder why the JDK authors do not follow their own 
> recommendations and use {{System.nanoTime()}} for all relative time 
> measures...
> EDIT: I am not the only one wondering why: 
> https://stackoverflow.com/questions/42544387/why-does-thread-join-use-currenttimemillis



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5658: [FLINK-8856] [TaskManager] Move all cancellation i...

2018-03-20 Thread StephanEwen
Github user StephanEwen closed the pull request at:

https://github.com/apache/flink/pull/5658


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406935#comment-16406935
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
Ok, to unblock this, I would suggest to add the new functionality under a 
different name, support only adding a single file (and also verify that the 
path passed in is a single file). @dawidwys would that be straightforward to do?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-03-20 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5580
  
Ok, to unblock this, I would suggest to add the new functionality under a 
different name, support only adding a single file (and also verify that the 
path passed in is a single file). @dawidwys would that be straightforward to do?


---


[jira] [Commented] (FLINK-8707) Excessive amount of files opened by flink task manager

2018-03-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8707?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406934#comment-16406934
 ] 

Stephan Ewen commented on FLINK-8707:
-

Thanks for all the additional data.

My first thoughts are that this could have indeed something to do with the 
classloading, or potentially some auxiliary service (rest / metrics, etc).

Brief recap on what Flink does classloading wise:

  - JobManager and TaskManager start with the classes in {{lib}} in the 
classpath.
  - Whenever a task is started in the TaskManager, the TaskManager ensures it 
has a local cached copy of all JAR files belonging to that job, and creates a 
dedicated classloader for that task. When the task is removed (finished, 
canceled, failed), the classloader is closed. For the URL Classloader that 
Flink uses, that is supposed to release the file references.

So, indeed, the jars could be referenced by multiple classloaders. But the 
factor you see is excessive, I cannot really imagine why so many references to 
the files should appear.

To debug this further, could you check the following:

  - When you start a set of Flink processes, do the TaskManagers immediately 
have many FDs?
  - When you start the job, do the TaskManagers immediately go high with the 
FDs?
  - When the job fails/recovers, does each recovery add additional open FDs?
  - When the job does not fail but just keeps running, do FDs grow steadily 
over time?

Thanks for helping with this one!

> Excessive amount of files opened by flink task manager
> --
>
> Key: FLINK-8707
> URL: https://issues.apache.org/jira/browse/FLINK-8707
> Project: Flink
>  Issue Type: Bug
>  Components: TaskManager
>Affects Versions: 1.3.2
> Environment: NAME="Red Hat Enterprise Linux Server"
> VERSION="7.3 (Maipo)"
> Two boxes, each with a Job Manager & Task Manager, using Zookeeper for HA.
> flink.yaml below with some settings (removed exact box names) etc:
> env.log.dir: ...some dir...residing on the same box
> env.pid.dir: some dir...residing on the same box
> metrics.reporter.jmx.class: org.apache.flink.metrics.jmx.JMXReporter
> metrics.reporters: jmx
> state.backend: filesystem
> state.backend.fs.checkpointdir: file:///some_nfs_mount
> state.checkpoints.dir: file:///some_nfs_mount
> state.checkpoints.num-retained: 3
> high-availability.cluster-id: /tst
> high-availability.storageDir: file:///some_nfs_mount/ha
> high-availability: zookeeper
> high-availability.zookeeper.path.root: /flink
> high-availability.zookeeper.quorum: ...list of zookeeper boxes
> env.java.opts.jobmanager: ...some extra jar args
> jobmanager.archive.fs.dir: some dir...residing on the same box
> jobmanager.web.submit.enable: true
> jobmanager.web.tmpdir:  some dir...residing on the same box
> env.java.opts.taskmanager: some extra jar args
> taskmanager.tmp.dirs:  some dir...residing on the same box/var/tmp
> taskmanager.network.memory.min: 1024MB
> taskmanager.network.memory.max: 2048MB
> blob.storage.directory:  some dir...residing on the same box
>Reporter: Alexander Gardner
>Priority: Blocker
> Fix For: 1.5.0
>
> Attachments: AfterRunning-3-jobs-Box2-TM-JCONSOLE.png, 
> AfterRunning-3-jobs-TM-FDs-BOX2.jpg, AfterRunning-3-jobs-lsof-p.box2-TM, 
> AfterRunning-3-jobs-lsof.box2-TM, AterRunning-3-jobs-Box1-TM-JCONSOLE.png, 
> box1-jobmgr-lsof, box1-taskmgr-lsof, box2-jobmgr-lsof, box2-taskmgr-lsof
>
>
> The job manager has less FDs than the task manager.
>  
> Hi
> A support alert indicated that there were a lot of open files for the boxes 
> running Flink.
> There were 4 flink jobs that were dormant but had consumed a number of msgs 
> from Kafka using the FlinkKafkaConsumer010.
> A simple general lsof:
> $ lsof | wc -l       ->  returned 153114 open file descriptors.
> Focusing on the TaskManager process (process ID = 12154):
> $ lsof | grep 12154 | wc -l-    > returned 129322 open FDs
> $ lsof -p 12154 | wc -l   -> returned 531 FDs
> There were 228 threads running for the task manager.
>  
> Drilling down a bit further, looking at a_inode and FIFO entries: 
> $ lsof -p 12154 | grep a_inode | wc -l = 100 FDs
> $ lsof -p 12154 | grep FIFO | wc -l  = 200 FDs
> $ /proc/12154/maps = 920 entries.
> Apart from lsof identifying lots of JARs and SOs being referenced there were 
> also 244 child processes for the task manager process.
> Noticed that in each environment, a creep of file descriptors...are the above 
> figures deemed excessive for the no of FDs in use? I know Flink uses Netty - 
> is it using a separate Selector for reads & writes? 
> Additionally Flink uses memory mapped files? or direct bytebuffers are these 
> skewing the numbers of FDs shown?
> Example of one child process ID 6633:
> java 12154 6633 dfdev 387u a_inode 0,9 0 5869 [eventpoll]
>  java 12154 

[jira] [Commented] (FLINK-9029) Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2

2018-03-20 Thread Stephan Ewen (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9029?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406901#comment-16406901
 ] 

Stephan Ewen commented on FLINK-9029:
-

Hmm, interesting. And security is OFF as you mentioned?

Can you try if it works if you completely remove the Hadoop uber jar from the 
lib folder, and instead export the hadoop classpath as in 
https://ci.apache.org/projects/flink/flink-docs-master/ops/deployment/hadoop.html

There was one fix to the Kerberos functionality between 1.4.0 and 1.4.2, which 
is 
https://github.com/apache/flink/commit/20146652baff31c3824f7ae31506b8c481cdf56c

The puzzling thing is that this particular commit does not modify the Hadoop 
Uber Jar.

[~tzulitai] Do you have any thoughts why that could happen?

> Getting write permission from HDFS after updating flink-1.40 to flink-1.4.2
> ---
>
> Key: FLINK-9029
> URL: https://issues.apache.org/jira/browse/FLINK-9029
> Project: Flink
>  Issue Type: Bug
>Affects Versions: 1.4.1, 1.4.2
> Environment: * Flink-1.4.2 (Flink-1.4.1)
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
>Reporter: Mohammad Abareghi
>Priority: Major
>
> *Environment*
>  * Flink-1.4.2
>  * Hadoop 2.6.0-cdh5.13.0 with 4 nodes in service and {{Security is off.}}
>  * Ubuntu 16.04.3 LTS
>  * Java 8
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
>  
> *Description*
> I have a Java job in flink-1.4.0 which writes to HDFS in a specific path. 
> After updating to flink-1.4.2 I'm getting the following error from Hadoop 
> complaining that the user doesn't have write permission to the given path:
> {code:java}
> WARN org.apache.hadoop.security.UserGroupInformation: 
> PriviledgedActionException as:xng (auth:SIMPLE) 
> cause:org.apache.hadoop.security.AccessControlException: Permission denied: 
> user=user1, access=WRITE, inode="/user":hdfs:hadoop:drwxr-xr-x
> {code}
> *NOTE*:
>  * If I run the same job on flink-1.4.0, Error disappears regardless of what 
> version of flink (1.4.0 or 1.4.2) dependencies I have for job
>  * Also if I run the job main method from my IDE and pass the same 
> parameters, I don't get above error.
> *NOTE*:
> It seems the problem somehow is in 
> {{flink-1.4.2/lib/flink-shaded-hadoop2-uber-1.4.2.jar}}. If I replace that 
> with {{flink-1.4.0/lib/flink-shaded-hadoop2-uber-1.4.0.jar}}, restart the 
> cluster and run my job (flink topology) then the error doesn't appear.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8701) Migrate SavepointMigrationTestBase to MiniClusterResource

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8701:


Assignee: Chesnay Schepler

> Migrate SavepointMigrationTestBase to MiniClusterResource
> -
>
> Key: FLINK-8701
> URL: https://issues.apache.org/jira/browse/FLINK-8701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This requires adding support for {{ClusterClient.getAccumulators()}} to the 
> new {{RestClusterClient}} and requires migrating the custom cluster 
> communication that the test does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8701) Migrate SavepointMigrationTestBase to MiniClusterResource

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8701.
--
Resolution: Fixed

Fixed via
master: 0231460259b410f42dd4933cb3ae993450bf1694
1.5.0: 60e05c05f3fbf20c1276bc2f1006eb4224eda43f

> Migrate SavepointMigrationTestBase to MiniClusterResource
> -
>
> Key: FLINK-8701
> URL: https://issues.apache.org/jira/browse/FLINK-8701
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> This requires adding support for {{ClusterClient.getAccumulators()}} to the 
> new {{RestClusterClient}} and requires migrating the custom cluster 
> communication that the test does.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8881) Accumulators not updated for running jobs

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8881?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8881.
--
Resolution: Fixed

Fixed via
master: 95d4c0170339585cb4876c4bafc1c2a42c44be3c
1.5.0: 6657aa2090b6b0f56f329152b3aaa1c147e73380

> Accumulators not updated for running jobs
> -
>
> Key: FLINK-8881
> URL: https://issues.apache.org/jira/browse/FLINK-8881
> Project: Flink
>  Issue Type: Bug
>  Components: DataSet API, DataStream API, Distributed Coordination, 
> JobManager, TaskManager
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The FLIP-6 {{TaskExecutor}} is never sending the current state of 
> accumulators to the JobMaster. They are only updated if the job is finished.
> The legacy TaskManager did this regularly as part of the heartbeat to the 
> JobManager.
> This is a regression and blocks the porting of some tests (like the 
> {{SavepointMigrationTestBase}}) that makes use of accumulators to determine 
> when the job shutdown condition is fulfilled.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8703) Migrate tests from LocalFlinkMiniCluster to MiniClusterResource

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406792#comment-16406792
 ] 

ASF GitHub Bot commented on FLINK-8703:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5701


> Migrate tests from LocalFlinkMiniCluster to MiniClusterResource
> ---
>
> Key: FLINK-8703
> URL: https://issues.apache.org/jira/browse/FLINK-8703
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Reporter: Aljoscha Krettek
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5701: [FLINK-8703][tests] Port SavepointMigrationTestBas...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5701


---


[GitHub] flink pull request #5712: [FLINK-9011] YarnResourceManager spamming log file...

2018-03-20 Thread yew1eb
Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r175842877
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

the {{isDebugEnabled}} is typically used to avoid unnecessary String 
concatination.
but checking for debugging enabled is not necessary here.


---


[jira] [Commented] (FLINK-9011) YarnResourceManager spamming log file at INFO level

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406679#comment-16406679
 ] 

ASF GitHub Bot commented on FLINK-9011:
---

Github user yew1eb commented on a diff in the pull request:

https://github.com/apache/flink/pull/5712#discussion_r175842877
  
--- Diff: flink-yarn/src/main/java/org/apache/flink/yarn/Utils.java ---
@@ -351,16 +351,16 @@ static ContainerLaunchContext 
createTaskExecutorContext(
require(yarnClientUsername != null, "Environment variable %s 
not set", YarnConfigKeys.ENV_HADOOP_USER_NAME);
 
final String remoteKeytabPath = 
env.get(YarnConfigKeys.KEYTAB_PATH);
-   log.info("TM:remote keytab path obtained {}", remoteKeytabPath);
-
final String remoteKeytabPrincipal = 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
-   log.info("TM:remote keytab principal obtained {}", 
remoteKeytabPrincipal);
-
final String remoteYarnConfPath = 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
-   log.info("TM:remote yarn conf path obtained {}", 
remoteYarnConfPath);
-
final String remoteKrb5Path = 
env.get(YarnConfigKeys.ENV_KRB5_PATH);
-   log.info("TM:remote krb5 path obtained {}", remoteKrb5Path);
+
+   if (log.isDebugEnabled()) {
--- End diff --

the {{isDebugEnabled}} is typically used to avoid unnecessary String 
concatination.
but checking for debugging enabled is not necessary here.


> YarnResourceManager spamming log file at INFO level
> ---
>
> Key: FLINK-9011
> URL: https://issues.apache.org/jira/browse/FLINK-9011
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> For every requested resource, the {{YarnResourceManager}} spams the log with 
> log-level INFO and the following messages:
> {code}
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Received new container: 
> container_1521038088305_0257_01_000102 - Remaining pending container 
> requests: 301
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TaskExecutor container_1521038088305_0257_01_000102 will be 
> started with container size 8192 MB, JVM heap size 5120 MB, JVM direct memory 
> limit 3072 MB
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote keytab principal obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote yarn conf path obtained null
> 2018-03-16 03:41:20,180 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - TM:remote krb5 path obtained null
> 2018-03-16 03:41:20,181 INFO  org.apache.flink.yarn.Utils 
>   - Copying from 
> file:/mnt/yarn/usercache/hadoop/appcache/application_1521038088305_0257/container_1521038088305_0257_01_01/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
>  to 
> hdfs://ip-172-31-1-91.eu-west-1.compute.internal:8020/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml
> 2018-03-16 03:41:20,190 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Prepared local resource for modified yaml: resource { scheme: 
> "hdfs" host: "ip-172-31-1-91.eu-west-1.compute.internal" port: 8020 file: 
> "/user/hadoop/.flink/application_1521038088305_0257/6766be70-82f7-4999-a371-11c27527fb6e-taskmanager-conf.yaml"
>  } size: 595 timestamp: 1521171680190 type: FILE visibility: APPLICATION
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Creating container launch context for TaskManagers
> 2018-03-16 03:41:20,194 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Starting TaskManagers with command: $JAVA_HOME/bin/java 
> -Xms5120m -Xmx5120m -XX:MaxDirectMemorySize=3072m  
> -Dlog.file=/taskmanager.log 
> -Dlogback.configurationFile=file:./logback.xml 
> -Dlog4j.configuration=file:./log4j.properties 
> org.apache.flink.yarn.YarnTaskExecutorRunner --configDir . 1> 
> /taskmanager.out 2> /taskmanager.err
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8887) ClusterClient.getJobStatus can throw FencingTokenException

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8887?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-8887:


Assignee: Till Rohrmann  (was: vinoyang)

> ClusterClient.getJobStatus can throw FencingTokenException
> --
>
> Key: FLINK-8887
> URL: https://issues.apache.org/jira/browse/FLINK-8887
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> *Description*
> Calling {{RestClusterClient.getJobStatus}} or 
> {{MiniClusterClient.getJobStatus}} can result in a {{FencingTokenException}}. 
> *Analysis*
> {{Dispatcher.requestJobStatus}} first looks the {{JobManagerRunner}} up by 
> job id. If a reference is found, {{requestJobStatus}} is called on the 
> respective instance. If not, the {{ArchivedExecutionGraphStore}} is queried. 
> However, between the lookup and the method call, the {{JobMaster}} of the 
> respective job may have lost leadership already (job finished), and has set 
> the fencing token to {{null}}.
> *Stacktrace*
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token mismatch: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token null 
> did not match the expected fencing token b8423c75bc6838244b8c93c8bd4a4f51.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:73)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}
> {noformat}
> Caused by: org.apache.flink.runtime.rpc.exceptions.FencingTokenException: 
> Fencing token not set: Ignoring message LocalFencedMessage(null, 
> LocalRpcInvocation(requestJobStatus(Time))) because the fencing token is null.
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:56)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-9019) Unclosed closeableRegistry in StreamTaskStateInitializerImpl#rawOperatorStateInputs

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-9019.
--
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed via
master: 6eb91a1006590e6806ec0e6c381fca411d0e23d7
1.5.0: e2a62b3e257a0625392209be0a93880988c0c6f7

> Unclosed closeableRegistry in 
> StreamTaskStateInitializerImpl#rawOperatorStateInputs
> ---
>
> Key: FLINK-9019
> URL: https://issues.apache.org/jira/browse/FLINK-9019
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
> Fix For: 1.5.0
>
>
> {code}
>  final CloseableRegistry closeableRegistry = new CloseableRegistry();
> ...
>  if (rawOperatorState != null) {
> ...
>   }
> }
> return CloseableIterable.empty();
> {code}
> If rawOperatorState is null, closeableRegistry would be left unclosed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8942) Pass target ResourceID to HeartbeatListener#retrievePayload

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8942.
--
Resolution: Fixed

Fixed via
master: f9fbbc3a137276cab4b8abf272199f1cd4633d29
1.5.0: f3389638eed8d48db013dadf72c1af9748491968

> Pass target ResourceID to HeartbeatListener#retrievePayload
> ---
>
> Key: FLINK-8942
> URL: https://issues.apache.org/jira/browse/FLINK-8942
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For FLINK-8881 we need a way to determine to which JobManager we are sending 
> the heartbeats, otherwise we would have to send all accumulators, that is for 
> all jobs, to each connected JobManager.
> I suggest to pass the target {{ResourceID}} to 
> {{HeartbeatListener#retrievePayload}} which generates the payload.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2018-03-20 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-8801.
--
Resolution: Fixed

Fixed via
master: c90a757b29f168144b1bae99df532911ae682e63
1.5.0: 071dedcb769d1e3e899134e9566c69808ba99c53
1.4.3: 8b1376b1808d302574073ce180dc561244adc6f5

> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, ResourceManager, YARN
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8801) S3's eventual consistent read-after-write may fail yarn deployment of resources to S3

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8801?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406648#comment-16406648
 ] 

ASF GitHub Bot commented on FLINK-8801:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5602


> S3's eventual consistent read-after-write may fail yarn deployment of 
> resources to S3
> -
>
> Key: FLINK-8801
> URL: https://issues.apache.org/jira/browse/FLINK-8801
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, ResourceManager, YARN
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>Priority: Blocker
> Fix For: 1.5.0, 1.4.3
>
>
> According to 
> https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html#ConsistencyModel:
> {quote}
> Amazon S3 provides read-after-write consistency for PUTS of new objects in 
> your S3 bucket in all regions with one caveat. The caveat is that if you make 
> a HEAD or GET request to the key name (to find if the object exists) before 
> creating the object, Amazon S3 provides eventual consistency for 
> read-after-write.
> {quote}
> Some S3 file system implementations may actually execute such a request for 
> the about-to-write object and thus the read-after-write is only eventually 
> consistent. {{org.apache.flink.yarn.Utils#setupLocalResource()}} currently 
> relies on a consistent read-after-write since it accesses the remote resource 
> to get file size and modification timestamp. Since there we have access to 
> the local resource, we can use the data from there instead and circumvent the 
> problem.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9019) Unclosed closeableRegistry in StreamTaskStateInitializerImpl#rawOperatorStateInputs

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9019?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406647#comment-16406647
 ] 

ASF GitHub Bot commented on FLINK-9019:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5723


> Unclosed closeableRegistry in 
> StreamTaskStateInitializerImpl#rawOperatorStateInputs
> ---
>
> Key: FLINK-9019
> URL: https://issues.apache.org/jira/browse/FLINK-9019
> Project: Flink
>  Issue Type: Bug
>Reporter: Ted Yu
>Assignee: vinoyang
>Priority: Minor
>
> {code}
>  final CloseableRegistry closeableRegistry = new CloseableRegistry();
> ...
>  if (rawOperatorState != null) {
> ...
>   }
> }
> return CloseableIterable.empty();
> {code}
> If rawOperatorState is null, closeableRegistry would be left unclosed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8942) Pass target ResourceID to HeartbeatListener#retrievePayload

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8942?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406649#comment-16406649
 ] 

ASF GitHub Bot commented on FLINK-8942:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5699


> Pass target ResourceID to HeartbeatListener#retrievePayload
> ---
>
> Key: FLINK-8942
> URL: https://issues.apache.org/jira/browse/FLINK-8942
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>
> For FLINK-8881 we need a way to determine to which JobManager we are sending 
> the heartbeats, otherwise we would have to send all accumulators, that is for 
> all jobs, to each connected JobManager.
> I suggest to pass the target {{ResourceID}} to 
> {{HeartbeatListener#retrievePayload}} which generates the payload.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5602: [FLINK-8801][yarn/s3] fix Utils#setupLocalResource...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5602


---


[GitHub] flink pull request #5723: [FLINK-9019] Unclosed closeableRegistry in StreamT...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5723


---


[GitHub] flink pull request #5699: [FLINK-8942][runtime] Pass heartbeat target Resour...

2018-03-20 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5699


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828711
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825995
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
--- End diff --

Let's quickly document all the different configuration settings in the 
JavaDocs of the `main`. I think this makes it easier for the user of this class 
in the future.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825436
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
--- End diff --

We can leave it here but in general I think abbreviations should be avoided.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826508
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825728
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
--- End diff --

wouldn't `pt.getBoolean("externalizedCheckpoints", false)` be the same?


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406610#comment-16406610
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825995
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
--- End diff --

Let's quickly document all the different configuration settings in the 
JavaDocs of the `main`. I think this makes it easier for the user of this class 
in the future.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175831034
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828150
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175827727
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175832543
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

I'm also not sure whether we want to expose location information to the 
`Task`. Given scheduling transparency, this feels a bit wrong. Isn't there a 
different way to check whether a task was recovered locally?


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828193
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175820558
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
--- End diff --

Shall we put the job in its own module to simplify the jar building? This 
will also help to keep the dependencies of several jobs separated.


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406602#comment-16406602
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826404
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826610
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406606#comment-16406606
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826508
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406612#comment-16406612
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175827727
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406609#comment-16406609
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826610
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406608#comment-16406608
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825436
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
--- End diff --

We can leave it here but in general I think abbreviations should be avoided.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406615#comment-16406615
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175832543
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

I'm also not sure whether we want to expose location information to the 
`Task`. Given scheduling transparency, this feels a bit wrong. Isn't there a 
different way to check whether a task was recovered locally?


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406601#comment-16406601
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175817764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/NopTaskLocalStateStoreImpl.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongPredicate;
+
+/**
+ * This class implements a {@link TaskLocalStateStore} with no 
functionality and is used when local recovery is
+ * disabled.
+ */
+public final class NopTaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
--- End diff --

is `Nop` a typo and should be `NoOpTaskLocalStateStoreImpl`?


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406611#comment-16406611
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828193
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406604#comment-16406604
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175811121
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

If we really want to store the `AllocationID`, then we should store it not 
as a `String` but instead move `AllocationID` to `flink-core`. If this is not 
needed, then I would suggest to rename it something else.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406607#comment-16406607
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175820558
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
--- End diff --

Shall we put the job in its own module to simplify the jar building? This 
will also help to keep the dependencies of several jobs separated.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406605#comment-16406605
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175825728
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
--- End diff --

wouldn't `pt.getBoolean("externalizedCheckpoints", false)` be the same?


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan 

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406614#comment-16406614
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828150
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406616#comment-16406616
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175828711
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406600#comment-16406600
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175817052
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+function checkLogs {
+  parallelism=$1
+  attempts=$2
+  (( expectedCount=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  failedLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  attemptLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failedLocalRecovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failedLocalRecovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attemptLocalRecovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+function cleanupAfterTest {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdogPid} 2> /dev/null
+  wait ${watchdogPid} 2> /dev/null
+  #
+  cleanup
+}
+
+function cleanupAfterTestAndExitFail {
+  cleanupAfterTest
+  exit 1
+}
+
+## This function executes one run for a certain configuration
+function runLocalRecoveryTest {
+  parallelism=$1
+  maxAttempts=$2
+  backend=$3
+  incremental=$4
+  killJVM=$5
+
+  echo "Running local recovery test on ${backend} backend: incremental 
checkpoints = ${incremental}, kill JVM = ${killJVM}."
+  
TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/target/flink-end-to-end-tests.jar
+
+  # Enable debug logging
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' 
"$FLINK_DIR/conf/log4j.properties"
+
+  # Enable local recovery
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
+
+  rm $FLINK_DIR/log/* 2> /dev/null
+
+  start_cluster
+
+  tm_watchdog ${parallelism} &
+  watchdogPid=$!
+
+  echo "Started TM watchdog with PID ${watchdogPid}."
+
+  $FLINK_DIR/bin/flink run -c 
org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+  -p ${parallelism} $TEST_PROGRAM_JAR --resolve-order parent-first \
--- End diff --

why do we start the job with `--resolve-order parent-first`?


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run 

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406613#comment-16406613
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175831034
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406603#comment-16406603
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175810393
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -31,12 +31,35 @@
 
private final String taskName;
private final String taskNameWithSubtasks;
+   private final String allocationID;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
 
-   public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int 
indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber) {
--- End diff --

Indentation is off by one level.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175817052
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+function checkLogs {
+  parallelism=$1
+  attempts=$2
+  (( expectedCount=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  failedLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  attemptLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failedLocalRecovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failedLocalRecovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attemptLocalRecovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+function cleanupAfterTest {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdogPid} 2> /dev/null
+  wait ${watchdogPid} 2> /dev/null
+  #
+  cleanup
+}
+
+function cleanupAfterTestAndExitFail {
+  cleanupAfterTest
+  exit 1
+}
+
+## This function executes one run for a certain configuration
+function runLocalRecoveryTest {
+  parallelism=$1
+  maxAttempts=$2
+  backend=$3
+  incremental=$4
+  killJVM=$5
+
+  echo "Running local recovery test on ${backend} backend: incremental 
checkpoints = ${incremental}, kill JVM = ${killJVM}."
+  
TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-end-to-end-tests/target/flink-end-to-end-tests.jar
+
+  # Enable debug logging
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=DEBUG, file/' 
"$FLINK_DIR/conf/log4j.properties"
+
+  # Enable local recovery
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  echo "state.backend.local-recovery: ENABLE_FILE_BASED" >> 
"$FLINK_DIR/conf/flink-conf.yaml"
+
+  rm $FLINK_DIR/log/* 2> /dev/null
+
+  start_cluster
+
+  tm_watchdog ${parallelism} &
+  watchdogPid=$!
+
+  echo "Started TM watchdog with PID ${watchdogPid}."
+
+  $FLINK_DIR/bin/flink run -c 
org.apache.flink.streaming.tests.StickyAllocationAndLocalRecoveryTestJob \
+  -p ${parallelism} $TEST_PROGRAM_JAR --resolve-order parent-first \
--- End diff --

why do we start the job with `--resolve-order parent-first`?


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406597#comment-16406597
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812511
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
+count=$((expectedTm-runningTm))
+for (( c=0; c /dev/null
+done
+sleep 5;
+  done
+}
+
+function jm_kill_all {
+  kill_all StandaloneSessionClusterEntrypoint
+}
+
+function tm_kill_all {
+  kill_all TaskManagerRunner
--- End diff --

Same here


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406599#comment-16406599
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812472
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
+count=$((expectedTm-runningTm))
+for (( c=0; c /dev/null
+done
+sleep 5;
+  done
+}
+
+function jm_kill_all {
+  kill_all StandaloneSessionClusterEntrypoint
--- End diff --

Would be good to make it work with the legacy code as well.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812472
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
+count=$((expectedTm-runningTm))
+for (( c=0; c /dev/null
+done
+sleep 5;
+  done
+}
+
+function jm_kill_all {
+  kill_all StandaloneSessionClusterEntrypoint
--- End diff --

Would be good to make it work with the legacy code as well.


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406598#comment-16406598
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812319
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
--- End diff --

Let's make the grep statement configurable because in non-Flip-6 mode the 
system does not start a `TaskManagerRunner` but a `TaskManager`.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175810393
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -31,12 +31,35 @@
 
private final String taskName;
private final String taskNameWithSubtasks;
+   private final String allocationID;
private final int maxNumberOfParallelSubtasks;
private final int indexOfSubtask;
private final int numberOfParallelSubtasks;
private final int attemptNumber;
 
-   public TaskInfo(String taskName, int maxNumberOfParallelSubtasks, int 
indexOfSubtask, int numberOfParallelSubtasks, int attemptNumber) {
+   public TaskInfo(
+   String taskName,
+   int maxNumberOfParallelSubtasks,
+   int indexOfSubtask,
+   int numberOfParallelSubtasks,
+   int attemptNumber) {
--- End diff --

Indentation is off by one level.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175817764
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/NopTaskLocalStateStoreImpl.java
 ---
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.LongPredicate;
+
+/**
+ * This class implements a {@link TaskLocalStateStore} with no 
functionality and is used when local recovery is
+ * disabled.
+ */
+public final class NopTaskLocalStateStoreImpl implements 
OwnedTaskLocalStateStore {
--- End diff --

is `Nop` a typo and should be `NoOpTaskLocalStateStoreImpl`?


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406596#comment-16406596
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812117
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
--- End diff --

Please add a comment explaining what this function does. Especially in bash 
scripts, I think they can be very helpful.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175826404
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175811121
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/api/common/TaskInfo.java ---
@@ -107,4 +131,12 @@ public int getAttemptNumber() {
public String getTaskNameWithSubtasks() {
return this.taskNameWithSubtasks;
}
+
+   /**
+* Returns the allocation id for where this task is executed.
+* @return the allocation id for where this task is executed.
+*/
+   public String getAllocationID() {
--- End diff --

If we really want to store the `AllocationID`, then we should store it not 
as a `String` but instead move `AllocationID` to `flink-core`. If this is not 
needed, then I would suggest to rename it something else.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812511
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
+count=$((expectedTm-runningTm))
+for (( c=0; c /dev/null
+done
+sleep 5;
+  done
+}
+
+function jm_kill_all {
+  kill_all StandaloneSessionClusterEntrypoint
+}
+
+function tm_kill_all {
+  kill_all TaskManagerRunner
--- End diff --

Same here


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812319
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
+  expectedTm=$1
+  while true;
+  do
+runningTm=`jps | grep -o 'TaskManagerRunner' | wc -l`;
--- End diff --

Let's make the grep statement configurable because in non-Flip-6 mode the 
system does not start a `TaskManagerRunner` but a `TaskManager`.


---


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r175812117
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -176,10 +176,40 @@ function s3_delete {
 https://${bucket}.s3.amazonaws.com/${s3_file}
 }
 
+function tm_watchdog {
--- End diff --

Please add a comment explaining what this function does. Especially in bash 
scripts, I think they can be very helpful.


---


[jira] [Commented] (FLINK-6924) ADD LOG(X) supported in TableAPI

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6924?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406584#comment-16406584
 ] 

ASF GitHub Bot commented on FLINK-6924:
---

Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5638
  
Thanks for the explanation @buptljy. Yeah if that's the case the only way 
to validate is via ITCase, it might be an overkill in this situation though. 


> ADD LOG(X) supported in TableAPI
> 
>
> Key: FLINK-6924
> URL: https://issues.apache.org/jira/browse/FLINK-6924
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: zjuwangg
>Priority: Major
>  Labels: starter
>
> See FLINK-6891 for detail.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5638: [FLINK-6924][table]ADD LOG(X) supported in TableAPI

2018-03-20 Thread walterddr
Github user walterddr commented on the issue:

https://github.com/apache/flink/pull/5638
  
Thanks for the explanation @buptljy. Yeah if that's the case the only way 
to validate is via ITCase, it might be an overkill in this situation though. 


---


[jira] [Commented] (FLINK-9033) Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9033?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406577#comment-16406577
 ] 

ASF GitHub Bot commented on FLINK-9033:
---

GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5731

[FLINK-9033][config] Replace usages of deprecated 
TASK_MANAGER_NUM_TASK_SLOTS

## What is the purpose of the change

The deprecated ConfigConstants#TASK_MANAGER_NUM_TASK_SLOTS is still used a 
lot.
We should replace these usages with TaskManagerOptions#NUM_TASK_SLOTS.

## Verifying this change

- `mvn clean verify`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-9033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5731.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5731






> Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS
> 
>
> Key: FLINK-9033
> URL: https://issues.apache.org/jira/browse/FLINK-9033
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration
>Affects Versions: 1.5.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.5.0
>
>
> The deprecated ConfigConstants#TASK_MANAGER_NUM_TASK_SLOTS is still used a 
> lot.
> We should replace these usages with TaskManagerOptions#NUM_TASK_SLOTS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5731: [FLINK-9033][config] Replace usages of deprecated ...

2018-03-20 Thread yew1eb
GitHub user yew1eb opened a pull request:

https://github.com/apache/flink/pull/5731

[FLINK-9033][config] Replace usages of deprecated 
TASK_MANAGER_NUM_TASK_SLOTS

## What is the purpose of the change

The deprecated ConfigConstants#TASK_MANAGER_NUM_TASK_SLOTS is still used a 
lot.
We should replace these usages with TaskManagerOptions#NUM_TASK_SLOTS.

## Verifying this change

- `mvn clean verify`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/yew1eb/flink FLINK-9033

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5731.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5731






---


[jira] [Commented] (FLINK-8903) Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in Group Windows

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8903?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406559#comment-16406559
 ] 

ASF GitHub Bot commented on FLINK-8903:
---

Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175825522
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Yes. This was kinda confusing to me, we should clean this up when adding 
DISTINCT support. Thanks for the update @fhueske 


> Built-in agg functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP are broken in 
> Group Windows
> ---
>
> Key: FLINK-8903
> URL: https://issues.apache.org/jira/browse/FLINK-8903
> Project: Flink
>  Issue Type: Bug
>  Components: Table API  SQL
>Affects Versions: 1.3.2, 1.5.0, 1.4.2
>Reporter: lilizhao
>Assignee: Fabian Hueske
>Priority: Critical
> Fix For: 1.5.0
>
> Attachments: QQ图片20180312180143.jpg, TableAndSQLTest.java
>
>
> The built-in aggregation functions VAR_POP, VAR_SAMP, STDEV_POP, STDEV_SAMP 
> are translated into regular AVG functions if they are applied in the context 
> of a Group Window aggregation (\{{GROUP BY TUMBLE/HOP/SESSION}}).
> The reason is that these functions are internally represented as 
> {{SqlAvgAggFunction}} but with different {{SqlKind}}. When translating 
> Calcite aggregation functions to Flink Table agg functions, we only look at 
> the type of the class, not at the value of the {{kind}} field. We did not 
> notice that before, because in all other cases (regular {{GROUP BY}} without 
> windows or {{OVER}} windows, we have a translation rule 
> {{AggregateReduceFunctionsRule}} that decomposes the more complex functions 
> into expressions of {{COUNT}} and {{SUM}} functions such that we never 
> execute an {{AVG}} Flink function. That rule can only be applied on 
> {{LogicalAggregate}}, however, we represent group windows as 
> {{LogicalWindowAggregate}}, so the rule does not match.
> We should fix this by:
> 1. restrict the translation to Flink avg functions in {{AggregateUtil}} to 
> {{SqlKind.AVG}}. 
> 2. implement a rule (hopefully based on {{AggregateReduceFunctionsRule}}) 
> that decomposes the complex agg functions into the {{SUM}} and {{COUNT}}.
> Step 1. is easy and a quick fix but we would get an exception "Unsupported 
> Function" if {{VAR_POP}} is used in a {{GROUP BY}} window.
> Step 2. might be more involved, depending on how difficult it is to port the 
> rule.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5706: [FLINK-8903] [table] Fix VAR_SAMP, VAR_POP, STDDEV...

2018-03-20 Thread walterddr
Github user walterddr commented on a diff in the pull request:

https://github.com/apache/flink/pull/5706#discussion_r175825522
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/plan/nodes/logical/FlinkLogicalWindowAggregate.scala
 ---
@@ -103,6 +106,22 @@ class FlinkLogicalWindowAggregateConverter
 FlinkConventions.LOGICAL,
 "FlinkLogicalWindowAggregateConverter") {
 
+  override def matches(call: RelOptRuleCall): Boolean = {
+val agg = call.rel(0).asInstanceOf[LogicalWindowAggregate]
+
+// we do not support these functions natively
+// they have to be converted using the 
WindowAggregateReduceFunctionsRule
+val supported = 
agg.getAggCallList.asScala.map(_.getAggregation.getKind).forall {
+  // we support AVG
+  case SqlKind.AVG => true
+  // but none of the other AVG agg functions
+  case k if SqlKind.AVG_AGG_FUNCTIONS.contains(k) => false
+  case _ => true
+}
+
+!agg.containsDistinctCall() && supported
--- End diff --

Yes. This was kinda confusing to me, we should clean this up when adding 
DISTINCT support. Thanks for the update @fhueske 


---


[jira] [Created] (FLINK-9033) Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS

2018-03-20 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-9033:
---

 Summary: Replace usages of deprecated TASK_MANAGER_NUM_TASK_SLOTS
 Key: FLINK-9033
 URL: https://issues.apache.org/jira/browse/FLINK-9033
 Project: Flink
  Issue Type: Improvement
  Components: Configuration
Affects Versions: 1.5.0
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.5.0


The deprecated ConfigConstants#TASK_MANAGER_NUM_TASK_SLOTS is still used a lot.

We should replace these usages with TaskManagerOptions#NUM_TASK_SLOTS.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-20 Thread yanghua
Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@tillrohrmann thanks for your reply, it's a misoperation, I have done 
rollback and refactored the code based on your suggestion. Please check the 
change again.


---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406509#comment-16406509
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user yanghua commented on the issue:

https://github.com/apache/flink/pull/5573
  
@tillrohrmann thanks for your reply, it's a misoperation, I have done 
rollback and refactored the code based on your suggestion. Please check the 
change again.


> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9032) Deprecate ConfigConstants#YARN_CONTAINER_START_COMMAND_TEMPLATE

2018-03-20 Thread Hai Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9032?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hai Zhou updated FLINK-9032:

Component/s: Configuration

> Deprecate ConfigConstants#YARN_CONTAINER_START_COMMAND_TEMPLATE
> ---
>
> Key: FLINK-9032
> URL: https://issues.apache.org/jira/browse/FLINK-9032
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Documentation
>Affects Versions: 1.5.0
>Reporter: Hai Zhou
>Assignee: Hai Zhou
>Priority: Major
> Fix For: 1.5.0, 1.6.0
>
>
> this contants("yarn.container-start-command-template") has disappeared from 
> the [1.5.0-SNAPSHOT 
> docs|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html].
> We should restore it, and I think it should be renamed 
> "containerized.start-command-template".
> [~Zentol], what do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7811) Add support for Scala 2.12

2018-03-20 Thread Andrew Roberts (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7811?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406501#comment-16406501
 ] 

Andrew Roberts commented on FLINK-7811:
---

Any updates on this? Is this still likely to make it into 1.5? We're very 
excited to get off of scala 2.11 and on to 2.12 (or 2.13!)

> Add support for Scala 2.12
> --
>
> Key: FLINK-7811
> URL: https://issues.apache.org/jira/browse/FLINK-7811
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5573: [FLINK-8756][Client] Support ClusterClient.getAccumulator...

2018-03-20 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Not sure what exactly happened but you can always go back to an earlier
point via the reflog and then a git hard reset.

On Tue, Mar 20, 2018 at 4:11 PM, vinoyang  wrote:

> hi @tillrohrmann  it seems I make a
> wrong git operation, I merged some new commits and squashed into one, then
> pushed into the PR(branch). What should I do to fix this problem?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



---


[jira] [Commented] (FLINK-8756) Support ClusterClient.getAccumulators() in RestClusterClient

2018-03-20 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406499#comment-16406499
 ] 

ASF GitHub Bot commented on FLINK-8756:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5573
  
Not sure what exactly happened but you can always go back to an earlier
point via the reflog and then a git hard reset.

On Tue, Mar 20, 2018 at 4:11 PM, vinoyang  wrote:

> hi @tillrohrmann  it seems I make a
> wrong git operation, I merged some new commits and squashed into one, then
> pushed into the PR(branch). What should I do to fix this problem?
>
> —
> You are receiving this because you were mentioned.
> Reply to this email directly, view it on GitHub
> , or 
mute
> the thread
> 

> .
>



> Support ClusterClient.getAccumulators() in RestClusterClient
> 
>
> Key: FLINK-8756
> URL: https://issues.apache.org/jira/browse/FLINK-8756
> Project: Flink
>  Issue Type: Improvement
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Aljoscha Krettek
>Assignee: vinoyang
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9032) Deprecate ConfigConstants#YARN_CONTAINER_START_COMMAND_TEMPLATE

2018-03-20 Thread Hai Zhou (JIRA)
Hai Zhou created FLINK-9032:
---

 Summary: Deprecate 
ConfigConstants#YARN_CONTAINER_START_COMMAND_TEMPLATE
 Key: FLINK-9032
 URL: https://issues.apache.org/jira/browse/FLINK-9032
 Project: Flink
  Issue Type: Improvement
  Components: Documentation
Affects Versions: 1.5.0
Reporter: Hai Zhou
Assignee: Hai Zhou
 Fix For: 1.5.0, 1.6.0


this contants("yarn.container-start-command-template") has disappeared from the 
[1.5.0-SNAPSHOT 
docs|https://ci.apache.org/projects/flink/flink-docs-master/ops/config.html].

We should restore it, and I think it should be renamed 
"containerized.start-command-template".

[~Zentol], what do you think?




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8899) Submitting YARN job with FLIP-6 may lead to ApplicationAttemptNotFoundException

2018-03-20 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8899?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16406493#comment-16406493
 ] 

Till Rohrmann commented on FLINK-8899:
--

Agreed this is not a blocker for 1.5.

> Submitting YARN job with FLIP-6 may lead to 
> ApplicationAttemptNotFoundException
> ---
>
> Key: FLINK-8899
> URL: https://issues.apache.org/jira/browse/FLINK-8899
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, YARN
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Priority: Minor
>  Labels: flip-6
>
> Occasionally, running a simple word count as this
> {code}
> ./bin/flink run -m yarn-cluster -yjm 768 -ytm 3072 -ys 2 -p 20 -c 
> org.apache.flink.streaming.examples.wordcount.WordCount 
> ./examples/streaming/WordCount.jar --input /usr/share/doc/rsync-3.0.6/COPYING
> {code}
> leads to an {{ApplicationAttemptNotFoundException}} in the logs:
> {code}
> 2018-03-08 16:18:08,507 INFO  
> org.apache.flink.runtime.executiongraph.ExecutionGraph- Job Streaming 
> WordCount (df707a3c9817ddf5936efe56d427e2bd) switched from state RUNNING to 
> FINISHED.
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator - Stopping 
> checkpoint coordinator for job df707a3c9817ddf5936efe56d427e2bd
> 2018-03-08 16:18:08,508 INFO  
> org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore  - 
> Shutting down
> 2018-03-08 16:18:08,536 INFO  
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  - Job 
> df707a3c9817ddf5936efe56d427e2bd reached globally terminal state FINISHED.
> 2018-03-08 16:18:08,611 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Stopping the JobMaster for job Streaming 
> WordCount(df707a3c9817ddf5936efe56d427e2bd).
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.runtime.jobmaster.JobMaster
>   - Close ResourceManager connection 
> dcfdc329d61aae0ace2de26292c8916b: JobManager is shutting down..
> 2018-03-08 16:18:08,634 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Disconnect job manager 
> 0...@akka.tcp://fl...@ip-172-31-2-0.eu-west-1.compute.internal:38555/user/jobmanager_0
>  for job df707a3c9817ddf5936efe56d427e2bd from the resource manager.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Suspending 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.slotpool.SlotPool  - Stopping 
> SlotPool.
> 2018-03-08 16:18:08,664 INFO  
> org.apache.flink.runtime.jobmaster.JobManagerRunner   - 
> JobManagerRunner already shutdown.
> 2018-03-08 16:18:09,650 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager adc8090bdb3f7052943ff86bde7d2a7b at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - Replacing old instance of worker for ResourceID 
> container_1519984124671_0090_01_05
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - 
> Unregister TaskManager adc8090bdb3f7052943ff86bde7d2a7b from the SlotManager.
> 2018-03-08 16:18:09,654 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager b975dbd16e0fd59c1168d978490a4b76 at the SlotManager.
> 2018-03-08 16:18:09,654 INFO  org.apache.flink.yarn.YarnResourceManager   
>   - The target with resource ID 
> container_1519984124671_0090_01_05 is already been monitored.
> 2018-03-08 16:18:09,992 INFO  
> org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager  - Register 
> TaskManager 73c258a0dbad236501b8391971c330ba at the SlotManager.
> 2018-03-08 16:18:10,000 INFO  
> org.apache.flink.runtime.entrypoint.ClusterEntrypoint - RECEIVED 
> SIGNAL 15: SIGTERM. Shutting down as requested.
> 2018-03-08 16:18:10,028 ERROR 
> org.apache.hadoop.yarn.client.api.async.impl.AMRMClientAsyncImpl  - Exception 
> on heartbeat
> org.apache.hadoop.yarn.exceptions.ApplicationAttemptNotFoundException: 
> Application attempt appattempt_1519984124671_0090_01 doesn't exist in 
> ApplicationMasterService cache.
>   at 
> org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService.allocate(ApplicationMasterService.java:403)
>   at 
> org.apache.hadoop.yarn.api.impl.pb.service.ApplicationMasterProtocolPBServiceImpl.allocate(ApplicationMasterProtocolPBServiceImpl.java:60)
>   at 
> org.apache.hadoop.yarn.proto.ApplicationMasterProtocol$ApplicationMasterProtocolService$2.callBlockingMethod(ApplicationMasterProtocol.java:99)
>   at 
> 

  1   2   3   4   >