[jira] [Commented] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread SHANKAR GANESH (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412386#comment-16412386
 ] 

SHANKAR GANESH commented on FLINK-9068:
---

[~yew1eb] Thank you. I agree, it's not important, but it caught my eye and I 
just wanted to log it, before i forget it about it :) 

I'll read the contribution guidelines and submit a PR.

> Website documentation issue - html tag visible on screen
> 
>
> Key: FLINK-9068
> URL: https://issues.apache.org/jira/browse/FLINK-9068
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: SHANKAR GANESH
>Priority: Minor
> Attachments: Screen Shot 2018-03-23 at 7.56.48 PM.png
>
>
> In the documentation at the following url
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]
> In the section which explains the 'Reduce' operator (*Reduce*
> KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412385#comment-16412385
 ] 

Hai Zhou commented on FLINK-9068:
-

It seems I can't find you inside the Assignee search box, you may need to apply 
for to become a contributor. (next monday, you can mail to 
*dev*@flink.apache.org, and CC: [~Zentol] , apply to be a apache flink 
contributor.)

 

The above is not particularly important :), now you can submit PR to solve 
it.(Please read the[ Contribute 
code|http://flink.apache.org/contribute-code.html] guide before you start to 
work on a code contribution.)

> Website documentation issue - html tag visible on screen
> 
>
> Key: FLINK-9068
> URL: https://issues.apache.org/jira/browse/FLINK-9068
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: SHANKAR GANESH
>Priority: Minor
> Attachments: Screen Shot 2018-03-23 at 7.56.48 PM.png
>
>
> In the documentation at the following url
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]
> In the section which explains the 'Reduce' operator (*Reduce*
> KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread SHANKAR GANESH (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412381#comment-16412381
 ] 

SHANKAR GANESH commented on FLINK-9068:
---

[~yew1eb] Sure, i'll be glad to fix it.

> Website documentation issue - html tag visible on screen
> 
>
> Key: FLINK-9068
> URL: https://issues.apache.org/jira/browse/FLINK-9068
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: SHANKAR GANESH
>Priority: Minor
> Attachments: Screen Shot 2018-03-23 at 7.56.48 PM.png
>
>
> In the documentation at the following url
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]
> In the section which explains the 'Reduce' operator (*Reduce*
> KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread Hai Zhou (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412380#comment-16412380
 ] 

Hai Zhou commented on FLINK-9068:
-

Thank you [~shankarganesh1234], found this typo issue, are you interested in 
fix it?

> Website documentation issue - html tag visible on screen
> 
>
> Key: FLINK-9068
> URL: https://issues.apache.org/jira/browse/FLINK-9068
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: SHANKAR GANESH
>Priority: Minor
> Attachments: Screen Shot 2018-03-23 at 7.56.48 PM.png
>
>
> In the documentation at the following url
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]
> In the section which explains the 'Reduce' operator (*Reduce*
> KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 3:06 AM:


Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there seems no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously a down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven


was (Author: stevenz3wu):
Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread SHANKAR GANESH (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9068?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

SHANKAR GANESH updated FLINK-9068:
--
Attachment: Screen Shot 2018-03-23 at 7.56.48 PM.png

> Website documentation issue - html tag visible on screen
> 
>
> Key: FLINK-9068
> URL: https://issues.apache.org/jira/browse/FLINK-9068
> Project: Flink
>  Issue Type: Bug
>  Components: Project Website
>Reporter: SHANKAR GANESH
>Priority: Minor
> Attachments: Screen Shot 2018-03-23 at 7.56.48 PM.png
>
>
> In the documentation at the following url
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]
> In the section which explains the 'Reduce' operator (*Reduce*
> KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-9068) Website documentation issue - html tag visible on screen

2018-03-23 Thread SHANKAR GANESH (JIRA)
SHANKAR GANESH created FLINK-9068:
-

 Summary: Website documentation issue - html tag visible on screen
 Key: FLINK-9068
 URL: https://issues.apache.org/jira/browse/FLINK-9068
 Project: Flink
  Issue Type: Bug
  Components: Project Website
Reporter: SHANKAR GANESH


In the documentation at the following url

[https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/stream/operators/#physical-partitioning]

In the section which explains the 'Reduce' operator (*Reduce*
KeyedStream → DataStream), an html tag () is visible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu commented on FLINK-9061:
---

Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
state.backend.fs.checkpointdir.injectEntropy.enabled: true
state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

 

 

 

 

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (FLINK-9061) S3 checkpoint data not partitioned well -- causes errors and poor performance

2018-03-23 Thread Steven Zhen Wu (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9061?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412326#comment-16412326
 ] 

Steven Zhen Wu edited comment on FLINK-9061 at 3/24/18 1:02 AM:


Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
 state.backend.fs.checkpointdir.injectEntropy.enabled: true
 state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven


was (Author: stevenz3wu):
Jamie,

yes, we run into the same issue at Netflix. We did exactly like what you said. 
Here are our config looks like.

state.backend.fs.checkpointdir: s3://bucket/__ENTROPY_KEY__/flink/checkpoints
state.backend.fs.checkpointdir.injectEntropy.enabled: true
state.backend.fs.checkpointdir.injectEntropy.key: __ENTROPY_KEY__

we modified state backend code to support it. Without the random chars in the 
path, there is no way for S3 to partition the bucket to support high request 
rate. I don't see other way around it. There is obviously down side with such 
random chars in s3 path. now you can't do prefix listing anymore.

 

Steven

 

 

 

 

 

> S3 checkpoint data not partitioned well -- causes errors and poor performance
> -
>
> Key: FLINK-9061
> URL: https://issues.apache.org/jira/browse/FLINK-9061
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, State Backends, Checkpointing
>Affects Versions: 1.4.2
>Reporter: Jamie Grier
>Priority: Critical
>
> I think we need to modify the way we write checkpoints to S3 for high-scale 
> jobs (those with many total tasks).  The issue is that we are writing all the 
> checkpoint data under a common key prefix.  This is the worst case scenario 
> for S3 performance since the key is used as a partition key.
>  
> In the worst case checkpoints fail with a 500 status code coming back from S3 
> and an internal error type of TooBusyException.
>  
> One possible solution would be to add a hook in the Flink filesystem code 
> that allows me to "rewrite" paths.  For example say I have the checkpoint 
> directory set to:
>  
> s3://bucket/flink/checkpoints
>  
> I would hook that and rewrite that path to:
>  
> s3://bucket/[HASH]/flink/checkpoints, where HASH is the hash of the original 
> path
>  
> This would distribute the checkpoint write load around the S3 cluster evenly.
>  
> For reference: 
> https://aws.amazon.com/premiumsupport/knowledge-center/s3-bucket-performance-improve/
>  
> Any other people hit this issue?  Any other ideas for solutions?  This is a 
> pretty serious problem for people trying to checkpoint to S3.
>  
> -Jamie
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9067) End-to-end test: Stream SQL query with failure and retry

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9067?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412292#comment-16412292
 ] 

ASF GitHub Bot commented on FLINK-9067:
---

GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5759

[FLINK-9067] [e2eTests] Add StreamSQLTestProgram and test run script.

## What is the purpose of the change

* Adds an end-to-end test for a streaming SQL query
* The query fails at the first execution attempt, recovers, and completes 
in the second attempt

## Brief change log

* Add a Streaming SQL Test Job
* Add a script to setup the Flink cluster, run the job, stop the cluster, 
and verify the result
* Register the script in the nightly test jobs

## Verifying this change

* This PR adds a test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink e2eTestSQLStream

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5759


commit 7a14340827f5a31a11c71e05075732d3c455cda5
Author: Fabian Hueske 
Date:   2018-03-23T20:42:53Z

[FLINK-9067] [e2eTests] Add StreamSQLTestProgram and test run script.




> End-to-end test: Stream SQL query with failure and retry
> 
>
> Key: FLINK-9067
> URL: https://issues.apache.org/jira/browse/FLINK-9067
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL, Tests
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Fabian Hueske
>Priority: Major
>
> Implement a test job that runs a streaming SQL query with a temporary failure 
> and recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5759: [FLINK-9067] [e2eTests] Add StreamSQLTestProgram a...

2018-03-23 Thread fhueske
GitHub user fhueske opened a pull request:

https://github.com/apache/flink/pull/5759

[FLINK-9067] [e2eTests] Add StreamSQLTestProgram and test run script.

## What is the purpose of the change

* Adds an end-to-end test for a streaming SQL query
* The query fails at the first execution attempt, recovers, and completes 
in the second attempt

## Brief change log

* Add a Streaming SQL Test Job
* Add a script to setup the Flink cluster, run the job, stop the cluster, 
and verify the result
* Register the script in the nightly test jobs

## Verifying this change

* This PR adds a test.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): **no**
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
  - The serializers: **no**
  - The runtime per-record code paths (performance sensitive): **no**
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: **no**
  - The S3 file system connector: **no**

## Documentation

  - Does this pull request introduce a new feature? **no**
  - If yes, how is the feature documented? **n/a**


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/fhueske/flink e2eTestSQLStream

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5759.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5759


commit 7a14340827f5a31a11c71e05075732d3c455cda5
Author: Fabian Hueske 
Date:   2018-03-23T20:42:53Z

[FLINK-9067] [e2eTests] Add StreamSQLTestProgram and test run script.




---


[jira] [Commented] (FLINK-8852) SQL Client does not work with new FLIP-6 mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8852?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412270#comment-16412270
 ] 

ASF GitHub Bot commented on FLINK-8852:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5704
  
I skimmed over the changes and did not notice anything suspicious. However, 
I'm not familiar with the FLIP-6 submission code. I think it would be good if 
somebody who's more familiar with that code would have a look as well.

I tried the changes on a local FLIP-6 cluster and that worked quite well.

Best, Fabian


> SQL Client does not work with new FLIP-6 mode
> -
>
> Key: FLINK-8852
> URL: https://issues.apache.org/jira/browse/FLINK-8852
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Affects Versions: 1.5.0
>Reporter: Fabian Hueske
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> The SQL client does not submit queries to local Flink cluster that runs in 
> FLIP-6 mode. It doesn't throw an exception either.
> Job submission works if the legacy Flink cluster mode is used (`mode: old`)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5704: [FLINK-8852] [sql-client] Add FLIP-6 support to SQL Clien...

2018-03-23 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5704
  
I skimmed over the changes and did not notice anything suspicious. However, 
I'm not familiar with the FLIP-6 submission code. I think it would be good if 
somebody who's more familiar with that code would have a look as well.

I tried the changes on a local FLIP-6 cluster and that worked quite well.

Best, Fabian


---


[jira] [Created] (FLINK-9067) End-to-end test: Stream SQL query with failure and retry

2018-03-23 Thread Fabian Hueske (JIRA)
Fabian Hueske created FLINK-9067:


 Summary: End-to-end test: Stream SQL query with failure and retry
 Key: FLINK-9067
 URL: https://issues.apache.org/jira/browse/FLINK-9067
 Project: Flink
  Issue Type: Sub-task
  Components: Table API  SQL, Tests
Affects Versions: 1.5.0
Reporter: Fabian Hueske
Assignee: Fabian Hueske


Implement a test job that runs a streaming SQL query with a temporary failure 
and recovery.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9059) Add support for unified table source and sink declaration in environment file

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9059?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16412021#comment-16412021
 ] 

ASF GitHub Bot commented on FLINK-9059:
---

GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5758

[FLINK-9059][Table API & SQL] add table type attribute; replace "sources" 
with "tables" in environm…

## What is the purpose of the change

Add support for unified table source and sink declaration in environment 
file definition.
This change prepares for FLINK-9049 (Create unified interfaces to configure 
and instatiate TableSink) We want to get this change in before 1.5 so it wont 
break the API in next flink release.


## Brief change log

  - Refactor sql client environment file definition to replace "sources" 
with "tables" 
  - Add "type" property to distinguish between table source and sink.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): (no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5758


commit 6da60ae23d83783137d44e282e06e16c947f0eb7
Author: Shuyi Chen 
Date:   2018-03-23T06:00:00Z

add table type attribute; replace "sources" with "tables" in environment 
file




> Add support for unified table source and sink declaration in environment file
> -
>
> Key: FLINK-9059
> URL: https://issues.apache.org/jira/browse/FLINK-9059
> Project: Flink
>  Issue Type: Task
>  Components: Table API  SQL
>Reporter: Shuyi Chen
>Assignee: Shuyi Chen
>Priority: Major
> Fix For: 1.5.0
>
>
> 1) Add a common property called "tableType" with single value 'source'.
> 2) in yaml file, replace "sources" with "tables".



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5758: [FLINK-9059][Table API & SQL] add table type attri...

2018-03-23 Thread suez1224
GitHub user suez1224 opened a pull request:

https://github.com/apache/flink/pull/5758

[FLINK-9059][Table API & SQL] add table type attribute; replace "sources" 
with "tables" in environm…

## What is the purpose of the change

Add support for unified table source and sink declaration in environment 
file definition.
This change prepares for FLINK-9049 (Create unified interfaces to configure 
and instatiate TableSink) We want to get this change in before 1.5 so it wont 
break the API in next flink release.


## Brief change log

  - Refactor sql client environment file definition to replace "sources" 
with "tables" 
  - Add "type" property to distinguish between table source and sink.

## Verifying this change

This change is already covered by existing tests, such as *(please describe 
tests)*.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): ( no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no )
  - The runtime per-record code paths (performance sensitive): (no )
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/suez1224/flink FLINK-9059

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5758.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5758


commit 6da60ae23d83783137d44e282e06e16c947f0eb7
Author: Shuyi Chen 
Date:   2018-03-23T06:00:00Z

add table type attribute; replace "sources" with "tables" in environment 
file




---


[jira] [Assigned] (FLINK-8985) End-to-end test: CLI

2018-03-23 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-8985:


Assignee: Rong Rong

> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8986) End-to-end test: REST

2018-03-23 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411958#comment-16411958
 ] 

Rong Rong commented on FLINK-8986:
--

Hi [~till.rohrmann], I can help with this e2e test. 

Is this specifically referring to the monitoring APIs discussed in the official 
docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/monitoring/rest_api.html?
 Any additional REST APIs we should be focusing on?

> End-to-end test: REST
> -
>
> Key: FLINK-8986
> URL: https://issues.apache.org/jira/browse/FLINK-8986
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Tests
>Reporter: Till Rohrmann
>Priority: Major
>
> We should add an end-to-end test which verifies that we can use the REST 
> interface to obtain information about a running job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (FLINK-8986) End-to-end test: REST

2018-03-23 Thread Rong Rong (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8986?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rong Rong reassigned FLINK-8986:


Assignee: Rong Rong

> End-to-end test: REST
> -
>
> Key: FLINK-8986
> URL: https://issues.apache.org/jira/browse/FLINK-8986
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Tests
>Reporter: Till Rohrmann
>Assignee: Rong Rong
>Priority: Major
>
> We should add an end-to-end test which verifies that we can use the REST 
> interface to obtain information about a running job.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8985) End-to-end test: CLI

2018-03-23 Thread Rong Rong (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8985?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411952#comment-16411952
 ] 

Rong Rong commented on FLINK-8985:
--

Hi [~till.rohrmann]. I can help contributing to the REST & CLI e2e testing. 
Looking at some of the current scripts in `flink-end-to-end-test` module there 
are already some CLI/REST utilized. do you think starting with the list of 
commands in official docs: 
https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/cli.html is a 
good idea?

> End-to-end test: CLI
> 
>
> Key: FLINK-8985
> URL: https://issues.apache.org/jira/browse/FLINK-8985
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
>
> We should an end-to-end test which verifies that all client commands are 
> working correctly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8965) Port TimestampITCase to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8965?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8965.
---
Resolution: Fixed

master: 2ac3474c4482b0366a905bce345dc9f90e64ba2f
1.5: 81d809a5f0030b14e0b7128d298cd9904e474ebd

> Port TimestampITCase to flip6
> -
>
> Key: FLINK-8965
> URL: https://issues.apache.org/jira/browse/FLINK-8965
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8964) Port JobSubmissionFailsITCase to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8964.
---
Resolution: Fixed

master: adeff9267ff23ebd14de39533341713241f25dfb
1.5: f0bd7b67aa348597cda9c4d3cf920ffd5a320896

> Port JobSubmissionFailsITCase to flip6
> --
>
> Key: FLINK-8964
> URL: https://issues.apache.org/jira/browse/FLINK-8964
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8957) Port JMXJobManagerMetricTest to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411817#comment-16411817
 ] 

ASF GitHub Bot commented on FLINK-8957:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5720


> Port JMXJobManagerMetricTest to flip6
> -
>
> Key: FLINK-8957
> URL: https://issues.apache.org/jira/browse/FLINK-8957
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8965) Port TimestampITCase to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8965?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411819#comment-16411819
 ] 

ASF GitHub Bot commented on FLINK-8965:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5728


> Port TimestampITCase to flip6
> -
>
> Key: FLINK-8965
> URL: https://issues.apache.org/jira/browse/FLINK-8965
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8959) Port AccumulatorLiveITCase to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411815#comment-16411815
 ] 

ASF GitHub Bot commented on FLINK-8959:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5719


> Port AccumulatorLiveITCase to flip6
> ---
>
> Key: FLINK-8959
> URL: https://issues.apache.org/jira/browse/FLINK-8959
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8957) Port JMXJobManagerMetricTest to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8957?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8957.
---
Resolution: Fixed

master: 0623e24c8814e073426062e8b27bf88e664ee3aa
1.5: ecdeb35cf43b160b3ff86d8acfb9672e758bd2b7

> Port JMXJobManagerMetricTest to flip6
> -
>
> Key: FLINK-8957
> URL: https://issues.apache.org/jira/browse/FLINK-8957
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8964) Port JobSubmissionFailsITCase to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411818#comment-16411818
 ] 

ASF GitHub Bot commented on FLINK-8964:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5727


> Port JobSubmissionFailsITCase to flip6
> --
>
> Key: FLINK-8964
> URL: https://issues.apache.org/jira/browse/FLINK-8964
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8958) Port TaskCancelAsyncProducerConsumerITCase to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8958?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8958.
---
Resolution: Fixed

master: 0c56e1917aa3a563a7425ba98ff33ed9bfcd22c5
1.5: 4ebbbe7dc4bbf1da66449acba0b5af2feb409de6

> Port TaskCancelAsyncProducerConsumerITCase to flip6
> ---
>
> Key: FLINK-8958
> URL: https://issues.apache.org/jira/browse/FLINK-8958
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8958) Port TaskCancelAsyncProducerConsumerITCase to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8958?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411816#comment-16411816
 ] 

ASF GitHub Bot commented on FLINK-8958:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5722


> Port TaskCancelAsyncProducerConsumerITCase to flip6
> ---
>
> Key: FLINK-8958
> URL: https://issues.apache.org/jira/browse/FLINK-8958
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5720: [FLINK-8957][tests] Port JMXJobManagerMetricTest t...

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5720


---


[jira] [Commented] (FLINK-8956) Port RescalingITCase to flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411813#comment-16411813
 ] 

ASF GitHub Bot commented on FLINK-8956:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5715


> Port RescalingITCase to flip6
> -
>
> Key: FLINK-8956
> URL: https://issues.apache.org/jira/browse/FLINK-8956
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8959) Port AccumulatorLiveITCase to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8959.
---
Resolution: Fixed

master: 4f5488c592fe153897042d24f9bd03b50767ba9a
1.5: 765bcc154cc10133b2a9ceb884c62138560370e0

> Port AccumulatorLiveITCase to flip6
> ---
>
> Key: FLINK-8959
> URL: https://issues.apache.org/jira/browse/FLINK-8959
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5722: [FLINK-8958][tests] Port TaskCancelAsyncProducerCo...

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5722


---


[GitHub] flink pull request #5728: [FLINK-8965][tests] Port TimestampITCase to flip6

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5728


---


[GitHub] flink pull request #5715: [FLINK-8956][tests] Port RescalingITCase to flip6

2018-03-23 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/5715


---


[GitHub] flink pull request #5727: [FLINK-8964][tests] Port JobSubmissionFailsITCase ...

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5727


---


[GitHub] flink pull request #5719: [FLINK-8959][tests] Port AccumulatorLiveITCase to ...

2018-03-23 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5719


---


[jira] [Closed] (FLINK-8956) Port RescalingITCase to flip6

2018-03-23 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8956?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8956.
---
Resolution: Fixed

master: edb6f7fef8c5df6af43bbe28f96d8c6bb3332d00
1.5: 0acc1e299fe34f9562c52e062d9759b7afe46dcc

> Port RescalingITCase to flip6
> -
>
> Key: FLINK-8956
> URL: https://issues.apache.org/jira/browse/FLINK-8956
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Blocker
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-9060) Deleting state using KeyedStateBackend.getKeys() throws Exception

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-9060?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411608#comment-16411608
 ] 

ASF GitHub Bot commented on FLINK-9060:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5751
  
Hi @kl0u I changed a bit of the implementation of JIRA, instead of 
implement multi wrapper classes for different `State`, I introduce a 
`StateInvocationHandler` which implemented `InvocationHandler` to delegate the 
`clear()` method of `State` , could you please have a look at this? and please 
let me know if you have any advice.


> Deleting state using KeyedStateBackend.getKeys() throws Exception
> -
>
> Key: FLINK-9060
> URL: https://issues.apache.org/jira/browse/FLINK-9060
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Aljoscha Krettek
>Assignee: Sihua Zhou
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Adding this test to {{StateBackendTestBase}} showcases the problem:
> {code}
> @Test
> public void testConcurrentModificationWithGetKeys() throws Exception {
>   AbstractKeyedStateBackend backend = 
> createKeyedBackend(IntSerializer.INSTANCE);
>   try {
>   ListStateDescriptor listStateDescriptor =
>   new ListStateDescriptor<>("foo", 
> StringSerializer.INSTANCE);
>   backend.setCurrentKey(1);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Hello");
>   backend.setCurrentKey(2);
>   backend
>   .getPartitionedState(VoidNamespace.INSTANCE, 
> VoidNamespaceSerializer.INSTANCE, listStateDescriptor)
>   .add("Ciao");
>   Stream keys = backend
>   .getKeys(listStateDescriptor.getName(), 
> VoidNamespace.INSTANCE);
>   keys.forEach((key) -> {
>   backend.setCurrentKey(key);
>   try {
>   backend
>   .getPartitionedState(
>   VoidNamespace.INSTANCE,
>   
> VoidNamespaceSerializer.INSTANCE,
>   listStateDescriptor)
>   .clear();
>   } catch (Exception e) {
>   e.printStackTrace();
>   }
>   });
>   }
>   finally {
>   IOUtils.closeQuietly(backend);
>   backend.dispose();
>   }
> }
> {code}
> This should work because one of the use cases of {{getKeys()}} and 
> {{applyToAllKeys()}} is to do stuff for every key, which includes deleting 
> them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5751: [FLINK-9060][state] Deleting state using KeyedStateBacken...

2018-03-23 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5751
  
Hi @kl0u I changed a bit of the implementation of JIRA, instead of 
implement multi wrapper classes for different `State`, I introduce a 
`StateInvocationHandler` which implemented `InvocationHandler` to delegate the 
`clear()` method of `State` , could you please have a look at this? and please 
let me know if you have any advice.


---


[jira] [Created] (FLINK-9066) Add 'HOTSPOTS' function to Flink SQL

2018-03-23 Thread Bowen Li (JIRA)
Bowen Li created FLINK-9066:
---

 Summary: Add 'HOTSPOTS' function to Flink SQL
 Key: FLINK-9066
 URL: https://issues.apache.org/jira/browse/FLINK-9066
 Project: Flink
  Issue Type: Improvement
  Components: Table API  SQL
Reporter: Bowen Li


Inspired by ['HOTSPOTS' function in AWS Kinesis Analytics 
SQL|https://aws.amazon.com/blogs/aws/real-time-hotspot-detection-in-amazon-kinesis-analytics/].

cc [~fhueske] [~fhue...@gmail.com]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-9014) Adapt BackPressureStatsTracker to work with credit-based flow control

2018-03-23 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-9014?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski closed FLINK-9014.
-
Resolution: Not A Problem
  Assignee: Piotr Nowojski

This wasn't an issue. Senders with credit based flow control in case of back 
pressure are also blocked on `requestBufferBuilderBlocking` as it is the case 
for non credit base flow control.

Credit base flow controls works on one level deeper - in 
netty/PartitionRequestQueue. Once Credit base flow control blocks a channel, it 
prevents the netty from reading from blocked Subpartition. This means that 
writers to this subpartition will be eventually blocked on waiting for buffers 
(`requestBufferBuilderBlocking`).

> Adapt BackPressureStatsTracker to work with credit-based flow control
> -
>
> Key: FLINK-9014
> URL: https://issues.apache.org/jira/browse/FLINK-9014
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network, Webfrontend
>Affects Versions: 1.5.0
>Reporter: Nico Kruber
>Assignee: Piotr Nowojski
>Priority: Critical
> Fix For: 1.5.0
>
>
> The {{BackPressureStatsTracker}} relies on sampling threads being blocked in 
> {{LocalBufferPool#requestBufferBuilderBlocking}} to indicate backpressure but 
> with credit-based flow control, we are also back-pressured if we did not get 
> any credits (yet).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411565#comment-16411565
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767248
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -91,6 +93,7 @@ public void after() throws Exception{
}
 
@Test
+   @Category(Flip6.class)
--- End diff --

revert?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411564#comment-16411564
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767271
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
 ---
@@ -40,6 +42,7 @@
 /**
  * Test the distributed cache.
  */
+@Category(Flip6.class)
--- End diff --

revert?


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411567#comment-16411567
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176769129
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -143,30 +160,23 @@ public void shutdown() {
/**
 * If the file doesn't exists locally, it will copy the file to the 
temp directory.
 *
-* @param name  The name under which the file is registered.
 * @param entry The cache entry descriptor (path, executable flag)
 * @param jobID The ID of the job for which the file is copied.
 * @return The handle to the task that copies the file.
 */
-   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID) {
+   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID, ExecutionAttemptID executionId) {
synchronized (lock) {
-   Map> 
jobEntries = entries.get(jobID);
-   if (jobEntries == null) {
-   jobEntries = new HashMap>();
-   entries.put(jobID, jobEntries);
-   }
+   Map jobEntries = 
entries.computeIfAbsent(jobID, k -> new HashMap<>());
+   final Set refHolders = 
jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+   refHolders.add(executionId);
 
// tuple is (ref-count, parent-temp-dir, 
cached-file-path, copy-process)
--- End diff --

outdated


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411568#comment-16411568
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767320
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, 
String name) {
public void registerCachedFile(String filePath, String name, boolean 
executable) {
this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
}
+
--- End diff --

revert


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411566#comment-16411566
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176768205
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

that's an implementation detail though


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767320
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 ---
@@ -1831,4 +1831,5 @@ public void registerCachedFile(String filePath, 
String name) {
public void registerCachedFile(String filePath, String name, boolean 
executable) {
this.cacheFile.add(new Tuple2<>(name, new 
DistributedCache.DistributedCacheEntry(filePath, executable)));
}
+
--- End diff --

revert


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176769129
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -143,30 +160,23 @@ public void shutdown() {
/**
 * If the file doesn't exists locally, it will copy the file to the 
temp directory.
 *
-* @param name  The name under which the file is registered.
 * @param entry The cache entry descriptor (path, executable flag)
 * @param jobID The ID of the job for which the file is copied.
 * @return The handle to the task that copies the file.
 */
-   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID) {
+   public Future createTmpFile(String name, DistributedCacheEntry 
entry, JobID jobID, ExecutionAttemptID executionId) {
synchronized (lock) {
-   Map> 
jobEntries = entries.get(jobID);
-   if (jobEntries == null) {
-   jobEntries = new HashMap>();
-   entries.put(jobID, jobEntries);
-   }
+   Map jobEntries = 
entries.computeIfAbsent(jobID, k -> new HashMap<>());
+   final Set refHolders = 
jobRefHolders.computeIfAbsent(jobID, id -> new HashSet<>());
+   refHolders.add(executionId);
 
// tuple is (ref-count, parent-temp-dir, 
cached-file-path, copy-process)
--- End diff --

outdated


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767248
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/iterative/aggregators/AggregatorsITCase.java
 ---
@@ -91,6 +93,7 @@ public void after() throws Exception{
}
 
@Test
+   @Category(Flip6.class)
--- End diff --

revert?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176767271
  
--- Diff: 
flink-tests/src/test/java/org/apache/flink/test/distributedcache/DistributedCacheTest.java
 ---
@@ -40,6 +42,7 @@
 /**
  * Test the distributed cache.
  */
+@Category(Flip6.class)
--- End diff --

revert?


---


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176768205
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

that's an implementation detail though


---


[jira] [Commented] (FLINK-5411) LocalStreamEnvironmentITCase#testRunIsolatedJob failed on travis

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5411?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411555#comment-16411555
 ] 

ASF GitHub Bot commented on FLINK-5411:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5757

[FLINK-5411] [flip6] Fix JobLeaderIdService shut down in ResourceManager

## What is the purpose of the change

The JobLeaderIdService was formerly closed at two different locations. Once 
in the
ResourceManager and once in the ResourceManagerRuntimeServices. Since the 
JobLeaderIdService
is a RM specific component. It should also be closed in the scope of the RM.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixJobLeaderIdServiceShutDown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5757


commit 2bd3d6d1474b85dfaa341b829d7b0e9816bc9353
Author: Till Rohrmann 
Date:   2018-03-23T15:21:55Z

[FLINK-5411] [flip6] Fix JobLeaderIdService shut down in ResourceManager

The JobLeaderIdService was formerly closed at two different locations. Once 
in the
ResourceManager and once in the ResourceManagerRuntimeServices. Since the 
JobLeaderIdService
is a RM specific component. It should also be closed in the scope of the RM.




> LocalStreamEnvironmentITCase#testRunIsolatedJob failed on travis
> 
>
> Key: FLINK-5411
> URL: https://issues.apache.org/jira/browse/FLINK-5411
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, Tests
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/189148248/log.txt
> Running 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase
> Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.011 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase
> testRunIsolatedJob(org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase)
>   Time elapsed: 1.604 sec  <<< ERROR!
> java.util.ConcurrentModificationException: null
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.clear(JobLeaderIdService.java:114)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.stop(JobLeaderIdService.java:92)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.shutDown(ResourceManager.java:182)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDownInternally(ResourceManagerRunner.java:83)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDown(ResourceManagerRunner.java:78)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:313)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:281)
>   at 
> org.apache.flink.streaming.api.environment.Flip6LocalStreamEnvironment.execute(Flip6LocalStreamEnvironment.java:124)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1486)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase.testRunIsolatedJob(LocalStreamEnvironmentITCase.java:41)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411557#comment-16411557
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176770591
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
--- End diff --

It is used in the `check_all_pass` in the `common.sh`. Kind of obscure, but 
this is the structure for now...


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176770591
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
--- End diff --

It is used in the `check_all_pass` in the `common.sh`. Kind of obscure, but 
this is the structure for now...


---


[GitHub] flink pull request #5757: [FLINK-5411] [flip6] Fix JobLeaderIdService shut d...

2018-03-23 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5757

[FLINK-5411] [flip6] Fix JobLeaderIdService shut down in ResourceManager

## What is the purpose of the change

The JobLeaderIdService was formerly closed at two different locations. Once 
in the
ResourceManager and once in the ResourceManagerRuntimeServices. Since the 
JobLeaderIdService
is a RM specific component. It should also be closed in the scope of the RM.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
fixJobLeaderIdServiceShutDown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5757.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5757


commit 2bd3d6d1474b85dfaa341b829d7b0e9816bc9353
Author: Till Rohrmann 
Date:   2018-03-23T15:21:55Z

[FLINK-5411] [flip6] Fix JobLeaderIdService shut down in ResourceManager

The JobLeaderIdService was formerly closed at two different locations. Once 
in the
ResourceManager and once in the ResourceManagerRuntimeServices. Since the 
JobLeaderIdService
is a RM specific component. It should also be closed in the scope of the RM.




---


[jira] [Assigned] (FLINK-5411) LocalStreamEnvironmentITCase#testRunIsolatedJob failed on travis

2018-03-23 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5411?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-5411:


Assignee: Till Rohrmann

> LocalStreamEnvironmentITCase#testRunIsolatedJob failed on travis
> 
>
> Key: FLINK-5411
> URL: https://issues.apache.org/jira/browse/FLINK-5411
> Project: Flink
>  Issue Type: Bug
>  Components: ResourceManager, Tests
>Affects Versions: 1.3.0
>Reporter: Chesnay Schepler
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0
>
>
> https://s3.amazonaws.com/archive.travis-ci.org/jobs/189148248/log.txt
> Running 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase
> Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 2.011 sec <<< 
> FAILURE! - in 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase
> testRunIsolatedJob(org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase)
>   Time elapsed: 1.604 sec  <<< ERROR!
> java.util.ConcurrentModificationException: null
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1429)
>   at java.util.HashMap$ValueIterator.next(HashMap.java:1458)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.clear(JobLeaderIdService.java:114)
>   at 
> org.apache.flink.runtime.resourcemanager.JobLeaderIdService.stop(JobLeaderIdService.java:92)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManager.shutDown(ResourceManager.java:182)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDownInternally(ResourceManagerRunner.java:83)
>   at 
> org.apache.flink.runtime.resourcemanager.ResourceManagerRunner.shutDown(ResourceManagerRunner.java:78)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdownInternally(MiniCluster.java:313)
>   at 
> org.apache.flink.runtime.minicluster.MiniCluster.shutdown(MiniCluster.java:281)
>   at 
> org.apache.flink.streaming.api.environment.Flip6LocalStreamEnvironment.execute(Flip6LocalStreamEnvironment.java:124)
>   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1486)
>   at 
> org.apache.flink.streaming.api.environment.LocalStreamEnvironmentITCase.testRunIsolatedJob(LocalStreamEnvironmentITCase.java:41)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411545#comment-16411545
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176767603
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+  

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-23 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176767603
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411543#comment-16411543
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176767031
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176767031
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411541#comment-16411541
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176766535
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
--- End diff --

OK :+1:


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end 

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176766535
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
--- End diff --

OK :+1:


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411539#comment-16411539
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176766216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176766216
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411537#comment-16411537
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176765903
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
--- End diff --

There is only one running at any point in time.


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176765903
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
--- End diff --

There is only one running at any point in time.


---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411533#comment-16411533
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176765707
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   

[jira] [Commented] (FLINK-8813) AutoParallellismITCase fails with Flip6

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411535#comment-16411535
 ] 

ASF GitHub Bot commented on FLINK-8813:
---

GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5756

[FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX for flip6

This deprecates 
`org.apache.flink.api.common.ExecutionConfig#PARALLELISM_AUTO_MAX`. From now 
on, with flip6 mode user will be hit with following error message.

```
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could 
not start JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:299)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at 
akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:167)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:762)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
... 20 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: 
PARALLELISM_AUTO_MAX is no longer supported. Please specify a concrete value 
for the parallelism.
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:206)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:290)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:147)
... 22 more
```

Previously it was similar message but ending with: `Caused by: 
java.lang.IllegalArgumentException: The parallelism must be at least one.`

## What is the purpose of the change

Both flip6 and old modes are covered by `AutoParallelismITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no) - deprecates 
`org.apache.flink.api.common.ExecutionConfig#PARALLELISM_AUTO_MAX`
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5756


commit e31a90b45619517dba7c0832cfcd1c697d0a8b0f
Author: Piotr Nowojski 
Date:   

[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-23 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176765707
  
--- Diff: 
flink-end-to-end-tests/src/main/java/org/apache/flink/streaming/tests/StickyAllocationAndLocalRecoveryTestJob.java
 ---
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.tests;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichFlatMapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.ValueState;
+import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
+import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.FunctionInitializationContext;
+import org.apache.flink.runtime.state.FunctionSnapshotContext;
+import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
+import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+import org.apache.flink.streaming.api.operators.StreamingRuntimeContext;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.commons.lang3.RandomStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Automatic end-to-end test for local recovery (including sticky 
allocation).
+ */
+public class StickyAllocationAndLocalRecoveryTestJob {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(StickyAllocationAndLocalRecoveryTestJob.class);
+
+   public static void main(String[] args) throws Exception {
+
+   final ParameterTool pt = ParameterTool.fromArgs(args);
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   env.setParallelism(pt.getInt("parallelism", 1));
+   env.setMaxParallelism(pt.getInt("maxParallelism", 
pt.getInt("parallelism", 1)));
+   env.enableCheckpointing(pt.getInt("checkpointInterval", 1000));
+   
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE, 
pt.getInt("restartDelay", 0)));
+   if (pt.has("externalizedCheckpoints") && 
pt.getBoolean("externalizedCheckpoints", false)) {
+   
env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
+   }
+
+   String stateBackend = pt.get("stateBackend", "file");
+   String checkpointDir = pt.getRequired("checkpointDir");
+
+   boolean killJvmOnFail = pt.getBoolean("killJvmOnFail", false);
+
+   if ("file".equals(stateBackend)) {
+   boolean asyncCheckpoints = 
pt.getBoolean("asyncCheckpoints", false);
+   env.setStateBackend(new FsStateBackend(checkpointDir, 
asyncCheckpoints));
+   } else if ("rocks".equals(stateBackend)) {
+   boolean incrementalCheckpoints = 

[GitHub] flink pull request #5756: [FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX ...

2018-03-23 Thread pnowojski
GitHub user pnowojski opened a pull request:

https://github.com/apache/flink/pull/5756

[FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX for flip6

This deprecates 
`org.apache.flink.api.common.ExecutionConfig#PARALLELISM_AUTO_MAX`. From now 
on, with flip6 mode user will be hit with following error message.

```
Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could 
not start JobManager.
at 
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:299)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
at 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
at 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
at 
akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at 
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at 
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at 
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
set up JobManager
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:167)
at 
org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:762)
at 
org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:254)
... 20 more
Caused by: org.apache.flink.runtime.client.JobSubmissionException: 
PARALLELISM_AUTO_MAX is no longer supported. Please specify a concrete value 
for the parallelism.
at 
org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:206)
at 
org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:290)
at 
org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:147)
... 22 more
```

Previously it was similar message but ending with: `Caused by: 
java.lang.IllegalArgumentException: The parallelism must be at least one.`

## What is the purpose of the change

Both flip6 and old modes are covered by `AutoParallelismITCase`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no) - deprecates 
`org.apache.flink.api.common.ExecutionConfig#PARALLELISM_AUTO_MAX`
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (not applicable / docs / 
**JavaDocs** / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/pnowojski/flink f8813

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5756.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5756


commit e31a90b45619517dba7c0832cfcd1c697d0a8b0f
Author: Piotr Nowojski 
Date:   2018-03-23T15:01:57Z

[FLINK-8813][flip6] Disallow PARALLELISM_AUTO_MAX for flip6




---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411528#comment-16411528
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176764920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
  

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176764920
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
-   

[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176762986
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411523#comment-16411523
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176762986
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411513#comment-16411513
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176761099
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
--- End diff --

yes, this is because we do not change it here. But I will also change it 
for future safety.


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5750: [FLINK-8973] [E2E] HA end-to-end test with StateMa...

2018-03-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176761099
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
--- End diff --

yes, this is because we do not change it here. But I will also change it 
for future safety.


---


[jira] [Commented] (FLINK-8740) Job-level metrics lost during job re-submission in HA mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8740?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411511#comment-16411511
 ] 

ASF GitHub Bot commented on FLINK-8740:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5755

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

## What is the purpose of the change

Closes the JobManagerJobMetricGroup when suspending the `JobManager`. This 
allows to reregister the job metrics when the `JobManager` regains its 
leadership.

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobLevelMetricsHA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5755


commit ef2de68bcad3c4715941a1e61d0a460b4c609520
Author: Till Rohrmann 
Date:   2018-03-23T13:08:49Z

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

commit 98a49cd032346a2b09641ee6d30baadf9a98855f
Author: Till Rohrmann 
Date:   2018-03-23T14:43:20Z

[hotfix] Create ExecutionGraph when JobMaster is started

The ExecutionGraph is not a final resource in the JobMaster. For example, 
it is necessary
to create a new ExecutionGraph when rescaling the job or when the JobMaster 
loses and
regains its leadership.

commit dcddfeb0b8883964505bed55d5b3730cae9abe60
Author: Till Rohrmann 
Date:   2018-03-23T14:46:29Z

[hotfix] Remove unused fields in JobMaster




> Job-level metrics lost during job re-submission in HA mode
> --
>
> Key: FLINK-8740
> URL: https://issues.apache.org/jira/browse/FLINK-8740
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager
>Affects Versions: 1.4.0
>Reporter: Joshua DeWald
>Assignee: Till Rohrmann
>Priority: Blocker
> Fix For: 1.5.0
>
>
> When Flink is running in High Availability and a leader re-election occurs to 
> the same job manager, the job is unable to register the job-level metrics due 
> to a name collision. 
> This may occur even if a different Job Manager is elected, but as it is a 
> local JobManagerMetricsGroup which spits out the error, that is unlikely the 
> case.
>  
> *Expected Behavior*
> When a job is forced to re-submit due to Job Manager re-election, job-level 
> metrics should be available in the new instance of the job (uptime, 
> checkpoints size, checkpoint duration, etc)
> *Actual Behavior*
> When job gets re-submitted, it is unable to register job-level metrics due to 
> collision in the JobManagerMetricGroup, which leads to situation where even 
> though job is running the metrics around checkpoints and uptime are not 
> available
> *Steps to reproduce*
>  # Start up Flink in HA mode using ZooKeeper, single node is fine
>  # Submit a job to the cluster
>  # Stop and restart ZooKeeper
>  # In Job Manager logs you will see the following errors:
>  # 
> {noformat}
> 79043 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'totalNumberOfCheckpoints'. Metric will not be reported
> 79044 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfInProgressCheckpoints'. Metric will not be reported
> 79045 2018-02-19 21:58:15,928 WARN org.apache.flink.metrics.MetricGroup - 
> Name collision: Group already contains a Metric with the name 
> 'numberOfCompletedCheckpoints'. Metric will not be reported{noformat}
> *Proposed Solution*
> I suspect that there may be other related issues than just the metrics, but a 
> code change that seems to fix the issue is that, during recovery, to remove 
> the existing registered Job 

[GitHub] flink pull request #5755: [FLINK-8740] [metrics] Create new JobManagerJobMet...

2018-03-23 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5755

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

## What is the purpose of the change

Closes the JobManagerJobMetricGroup when suspending the `JobManager`. This 
allows to reregister the job metrics when the `JobManager` regains its 
leadership.

## Verifying this change

- Tested manually

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink jobLevelMetricsHA

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5755.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5755


commit ef2de68bcad3c4715941a1e61d0a460b4c609520
Author: Till Rohrmann 
Date:   2018-03-23T13:08:49Z

[FLINK-8740] [metrics] Create new JobManagerJobMetricGroup when creating a 
new ExecutionGraph

commit 98a49cd032346a2b09641ee6d30baadf9a98855f
Author: Till Rohrmann 
Date:   2018-03-23T14:43:20Z

[hotfix] Create ExecutionGraph when JobMaster is started

The ExecutionGraph is not a final resource in the JobMaster. For example, 
it is necessary
to create a new ExecutionGraph when rescaling the job or when the JobMaster 
loses and
regains its leadership.

commit dcddfeb0b8883964505bed55d5b3730cae9abe60
Author: Till Rohrmann 
Date:   2018-03-23T14:46:29Z

[hotfix] Remove unused fields in JobMaster




---


[jira] [Commented] (FLINK-8910) Introduce automated end-to-end test for local recovery (including sticky scheduling)

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411473#comment-16411473
 ] 

ASF GitHub Bot commented on FLINK-8910:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176753796
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+function checkLogs {
+  parallelism=$1
+  attempts=$2
+  (( expectedCount=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  failedLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  attemptLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failedLocalRecovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failedLocalRecovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attemptLocalRecovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+function cleanupAfterTest {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdogPid} 2> /dev/null
+  wait ${watchdogPid} 2> /dev/null
+  #
--- End diff --

The value `watchdogPid ` is not initialized here.


> Introduce automated end-to-end test for local recovery (including sticky 
> scheduling)
> 
>
> Key: FLINK-8910
> URL: https://issues.apache.org/jira/browse/FLINK-8910
> Project: Flink
>  Issue Type: Sub-task
>  Components: State Backends, Checkpointing
>Affects Versions: 1.5.0
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> We should have an automated end-to-end test that can run nightly to check 
> that sticky allocation and local recovery work as expected.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5676: [FLINK-8910][tests] Automated end-to-end test for ...

2018-03-23 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5676#discussion_r176753796
  
--- Diff: 
flink-end-to-end-tests/test-scripts/test_local_recovery_and_scheduling.sh ---
@@ -0,0 +1,111 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+function checkLogs {
+  parallelism=$1
+  attempts=$2
+  (( expectedCount=parallelism * (attempts + 1) ))
+
+  # Search for the log message that indicates restore problem from 
existing local state for the keyed backend.
+  failedLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (2/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  # Search for attempts to recover locally.
+  attemptLocalRecovery=$(grep '^.*Creating keyed state backend.* from 
alternative (1/2)\.$' $FLINK_DIR/log/* | wc -l | tr -d ' ')
+
+  if [ ${failedLocalRecovery} -ne 0 ]
+  then
+PASS=""
+echo "FAILURE: Found ${failedLocalRecovery} failed attempt(s) for 
local recovery of correctly scheduled task(s)."
+  fi
+
+  if [ ${attemptLocalRecovery} -eq 0 ]
+  then
+PASS=""
+echo "FAILURE: Found no attempt for local recovery. Configuration 
problem?"
+  fi
+}
+
+function cleanupAfterTest {
+  # Reset the configurations
+  sed -i -e 's/state.backend.local-recovery: .*//' 
"$FLINK_DIR/conf/flink-conf.yaml"
+  sed -i -e 's/log4j.rootLogger=.*/log4j.rootLogger=INFO, file/' 
"$FLINK_DIR/conf/log4j.properties"
+  #
+  kill ${watchdogPid} 2> /dev/null
+  wait ${watchdogPid} 2> /dev/null
+  #
--- End diff --

The value `watchdogPid ` is not initialized here.


---


[jira] [Assigned] (FLINK-8813) AutoParallellismITCase fails with Flip6

2018-03-23 Thread Piotr Nowojski (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8813?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Piotr Nowojski reassigned FLINK-8813:
-

Assignee: Piotr Nowojski

> AutoParallellismITCase fails with Flip6
> ---
>
> Key: FLINK-8813
> URL: https://issues.apache.org/jira/browse/FLINK-8813
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Piotr Nowojski
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{AutoParallelismITCase}} fails when running against flip6. 
> ([https://travis-ci.org/zentol/flink/jobs/347373854)]
> It appears that the {{JobMaster}} does not properly handle 
> {{ExecutionConfig#PARALLELISM_AUTO_MAX}}.
>  
> Exception:
> {code:java}
> Caused by: org.apache.flink.runtime.client.JobSubmissionException: Could not 
> start JobManager.
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:287)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:210)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:154)
>   at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleMessage(FencedAkkaRpcActor.java:66)
>   at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$onReceive$1(AkkaRpcActor.java:132)
>   at 
> akka.actor.ActorCell$$anonfun$become$1.applyOrElse(ActorCell.scala:544)
>   at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>   at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>   at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>   at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>   at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>   at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>   at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>   at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>   at 
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>   at 
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> Caused by: org.apache.flink.runtime.client.JobExecutionException: Could not 
> set up JobManager
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:181)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:747)
>   at 
> org.apache.flink.runtime.dispatcher.Dispatcher.submitJob(Dispatcher.java:243)
>   ... 20 more
> Caused by: java.lang.IllegalArgumentException: The parallelism must be at 
> least one.
>   at 
> org.apache.flink.runtime.jobgraph.JobVertex.setParallelism(JobVertex.java:290)
>   at 
> org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder.buildGraph(ExecutionGraphBuilder.java:162)
>   at 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:295)
>   at 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:170)
>   ... 22 more{code}
>  
> The likely culprit is this call to {{ExecutionGraphBuilder#buildGraph}} in 
> the {{JobMaster}} constructor:
> {code:java}
> this.executionGraph = ExecutionGraphBuilder.buildGraph(
>null,
>jobGraph,
>jobMasterConfiguration.getConfiguration(),
>scheduledExecutorService,
>scheduledExecutorService,
>slotPool.getSlotProvider(),
>userCodeLoader,
>highAvailabilityServices.getCheckpointRecoveryFactory(),
>rpcTimeout,
>restartStrategy,
>jobMetricGroup,
>-1, // parallelismForAutoMax
>blobServer,
>jobMasterConfiguration.getSlotRequestTimeout(),
>log);{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411452#comment-16411452
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
Thanks @zentol for a review. 

You are right not supporting it in old cluster mode would be a regression. 
As I've first discussed it, there were some doubts if it could work in old mode 
(some hypothetical problems with blobs  timeouting during submission). 
Therefore I started with the RestClusterClient, but as I've had a look today, I 
saw no problem with doing it also for old cluster mode.

I've also reverted the cleanup process for `FileCache`.

Please have a look at the updated PR, if you have time. I will also rebase 
it shortly.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5580: [FLINK-8620] Enable shipping custom files to BlobStore an...

2018-03-23 Thread dawidwys
Github user dawidwys commented on the issue:

https://github.com/apache/flink/pull/5580
  
Thanks @zentol for a review. 

You are right not supporting it in old cluster mode would be a regression. 
As I've first discussed it, there were some doubts if it could work in old mode 
(some hypothetical problems with blobs  timeouting during submission). 
Therefore I started with the RestClusterClient, but as I've had a look today, I 
saw no problem with doing it also for old cluster mode.

I've also reverted the cleanup process for `FileCache`.

Please have a look at the updated PR, if you have time. I will also rebase 
it shortly.


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411448#comment-16411448
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176747071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176747071
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/filecache/FileCache.java 
---
@@ -267,101 +208,60 @@ private static Thread createShutdownHook(final 
FileCache cache, final Logger log
// 

 
/**
-* Asynchronous file copy process.
-*/
-   private static class CopyProcess implements Callable {
-
-   private final Path filePath;
-   private final Path cachedPath;
-   private boolean executable;
-
-   public CopyProcess(DistributedCacheEntry e, Path cachedPath) {
-   this.filePath = new Path(e.filePath);
-   this.executable = e.isExecutable;
-   this.cachedPath = cachedPath;
-   }
-
-   @Override
-   public Path call() throws IOException {
-   // let exceptions propagate. we can retrieve them later 
from
-   // the future and report them upon access to the result
-   copy(filePath, cachedPath, this.executable);
-   return cachedPath;
-   }
-   }
-
-   /**
-* If no task is using this file after 5 seconds, clear it.
+* Asynchronous file copy process from blob server.
 */
-   private static class DeleteProcess implements Runnable {
+   private static class CopyFromBlobProcess implements Callable {
 
-   private final Object lock;
-   private final Map>> entries;
-
-   private final String name;
+   private final PermanentBlobKey blobKey;
+   private final Path target;
+   private final boolean directory;
+   private final boolean executable;
private final JobID jobID;
+   private final PermanentBlobService blobService;
 
-   public DeleteProcess(Object lock, Map>> entries,
-   String name, 
JobID jobID) {
-   this.lock = lock;
-   this.entries = entries;
-   this.name = name;
-   this.jobID = jobID;
+   CopyFromBlobProcess(DistributedCacheEntry e, JobID jobID, 
PermanentBlobService blobService, Path target) {
+   try {
+   this.executable = e.isExecutable;
+   this.directory = e.isZipped;
+   this.jobID = jobID;
+   this.blobService = blobService;
+   this.blobKey = 
InstantiationUtil.deserializeObject(e.blobKey, 
Thread.currentThread().getContextClassLoader());
+   this.target = target;
+   } catch (Exception ex) {
+   throw new RuntimeException(ex);
+   }
}
 
@Override
-   public void run() {
-   try {
-   synchronized (lock) {
-   Map> jobEntries = entries.get(jobID);
-
-   if (jobEntries != null) {
-   Tuple4 entry = jobEntries.get(name);
-
-   if (entry != null) {
-   int count = entry.f0;
-   if (count > 1) {
-   // multiple 
references still
-   entry.f0 = 
count - 1;
-   }
-   else {
-   // we remove 
the last reference
-   
jobEntries.remove(name);
-   if 
(jobEntries.isEmpty()) {
-   
entries.remove(jobID);
-   }
-
-   // abort the 
copy
-   
entry.f3.cancel(true);
-
- 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411445#comment-16411445
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176746645
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

It is not sufficient. In this testcase `entry.getName` returns 
`/pre,,,suff`. Note the `/` in the beginning.

Though I've changed `replaceAll` to `replaceFirst`.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176746645
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
+   String fileName = 
entry.getName().replaceAll("/", "");
--- End diff --

It is not sufficient. In this testcase `entry.getName` returns 
`/pre,,,suff`. Note the `/` in the beginning.

Though I've changed `replaceAll` to `replaceFirst`.


---


[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411435#comment-16411435
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745753
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+ 

[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745753
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---
@@ -527,59 +403,83 @@ else if (response == RETURN_ERROR) {
 *  Any additional configuration for the blob client
 * @param jobId
 *  ID of the job this blob belongs to (or null if 
job-unrelated)
-* @param jars
-*  List of JAR files to upload
+* @param files
+*  List of files to upload
 *
 * @throws IOException
 *  if the upload fails
 */
-   public static List uploadJarFiles(
-   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List jars)
+   public static List uploadFiles(
+   InetSocketAddress serverAddress, Configuration 
clientConfig, JobID jobId, List files)
throws IOException {
 
checkNotNull(jobId);
 
-   if (jars.isEmpty()) {
+   if (files.isEmpty()) {
return Collections.emptyList();
} else {
List blobKeys = new ArrayList<>();
 
try (BlobClient blobClient = new 
BlobClient(serverAddress, clientConfig)) {
-   for (final Path jar : jars) {
-   final FileSystem fs = 
jar.getFileSystem();
-   FSDataInputStream is = null;
-   try {
-   is = fs.open(jar);
-   final PermanentBlobKey key =
-   (PermanentBlobKey) 
blobClient.putInputStream(jobId, is, PERMANENT_BLOB);
-   blobKeys.add(key);
-   } finally {
-   if (is != null) {
-   is.close();
-   }
-   }
+   for (final Path file : files) {
+   final PermanentBlobKey key = 
blobClient.uploadFile(jobId, file);
+   blobKeys.add(key);
}
}
 
return blobKeys;
}
}
 
-   // 

-   //  Miscellaneous
-   // 

-
-   private static Throwable readExceptionFromStream(InputStream in) throws 
IOException {
-   int len = readLength(in);
-   byte[] bytes = new byte[len];
-   readFully(in, bytes, 0, len, "Error message");
+   /**
+* Uploads a single file to the {@link PermanentBlobService} of the 
given {@link BlobServer}.
+*
+* @param jobId
+*  ID of the job this blob belongs to (or null if 
job-unrelated)
+* @param file
+*  file to upload
+*
+* @throws IOException
+*  if the upload fails
+*/
+   public PermanentBlobKey uploadFile(JobID jobId, Path file) throws 
IOException {
+   final FileSystem fs = file.getFileSystem();
+   if (fs.getFileStatus(file).isDir()) {
+   return uploadDirectory(jobId, file, fs);
+   } else {
+   try (InputStream is = fs.open(file)) {
+   return (PermanentBlobKey) putInputStream(jobId, 
is, PERMANENT_BLOB);
+   }
+   }
+   }
 
-   try {
-   return (Throwable) 
InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader());
+   private PermanentBlobKey uploadDirectory(JobID jobId, Path file, 
FileSystem fs) throws IOException {
+   try (BlobOutputStream blobOutputStream = new 
BlobOutputStream(jobId, PERMANENT_BLOB, socket)) {
+   try (ZipOutputStream zipStream = new 
ZipOutputStream(blobOutputStream)) {
+   compressDirectoryToZipfile(fs, 
fs.getFileStatus(file), fs.getFileStatus(file), zipStream);
+   zipStream.finish();
+   return (PermanentBlobKey) 
blobOutputStream.finish();
+   }
}
-   catch (ClassNotFoundException e) {
-   // should never occur
-   throw new 

[jira] [Commented] (FLINK-8620) Enable shipping custom artifacts to BlobStore and accessing them through DistributedCache

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8620?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411432#comment-16411432
 ] 

ASF GitHub Bot commented on FLINK-8620:
---

Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

It does not need to. `getNextEntry` does it, at the very beginning.


> Enable shipping custom artifacts to BlobStore and accessing them through 
> DistributedCache
> -
>
> Key: FLINK-8620
> URL: https://issues.apache.org/jira/browse/FLINK-8620
> Project: Flink
>  Issue Type: New Feature
>Reporter: Dawid Wysakowicz
>Assignee: Dawid Wysakowicz
>Priority: Major
>
> We should be able to distribute custom files to taskmanagers. To do that we 
> can store those files in BlobStore and later on access them in TaskManagers 
> through DistributedCache.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5580: [FLINK-8620] Enable shipping custom files to BlobS...

2018-03-23 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/5580#discussion_r176745183
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/blob/BlobClientTest.java 
---
@@ -453,15 +460,51 @@ private void testGetFailsDuringStreaming(@Nullable 
final JobID jobId, BlobKey.Bl
}
 
/**
-* Tests the static {@link BlobClient#uploadJarFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
+* Tests the static {@link BlobClient#uploadFiles(InetSocketAddress, 
Configuration, JobID, List)} helper.
 */
@Test
public void testUploadJarFilesHelper() throws Exception {
uploadJarFile(getBlobServer(), getBlobClientConfig());
}
 
+   @Test
+   public void testDirectoryUploading() throws IOException {
+   final File newFolder = temporaryFolder.newFolder();
+   final File file1 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file1, "Test content");
+   final File file2 = File.createTempFile("pre", "suff", 
newFolder);
+   FileUtils.writeStringToFile(file2, "Test content 2");
+
+   final Map files = new HashMap<>();
+   files.put(file1.getName(), file1);
+   files.put(file2.getName(), file2);
+
+   BlobKey key;
+   final JobID jobId = new JobID();
+   final InetSocketAddress inetAddress = new 
InetSocketAddress("localhost", getBlobServer().getPort());
+   try (
+   BlobClient client = new BlobClient(
+   inetAddress, getBlobClientConfig())) {
+
+   key = client.uploadFile(jobId, new 
Path(newFolder.getPath()));
+   }
+
+   final File file = getBlobServer().getFile(jobId, 
(PermanentBlobKey) key);
+
+   try (ZipInputStream zis = new ZipInputStream(new 
FileInputStream(file))) {
+   ZipEntry entry;
+   while ((entry = zis.getNextEntry()) != null) {
--- End diff --

It does not need to. `getNextEntry` does it, at the very beginning.


---


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411388#comment-16411388
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176734645
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
+sleep 50
+done
+
+verify_logs ${jmKillAndRetries}
--- End diff --

Will the test fail if there are errors or will there be just error messages 
printed?
I don't see an `exit` or something similar


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411386#comment-16411386
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176727208
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411383#comment-16411383
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176726476
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 1
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+# create the masters file (only one currently).
+# This must have all the masters to be used in HA.
+echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+# then move on to create the flink-conf.yaml
+
+if [ -e $TEST_DATA_DIR/recovery ]; then
+   echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+   rm -rf $TEST_DATA_DIR/recovery
+fi
+
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 4
+parallelism.default: 1
+
+
#==
+# High Availability
+
#==
+
+high-availability: zookeeper
+high-availability.zookeeper.storageDir: 
file://${TEST_DATA_DIR}/recovery/
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /test_cluster_one
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+echo "Setting up HA Cluster..."
+create_ha_conf
+start_local_zk
+start_cluster
+}
+
+function start_local_zk {
+while read server ; do
+server=$(echo -e "${server}" | sed -e 's/^[[:space:]]*//' -e 
's/[[:space:]]*$//') # trim
+
+# match server.id=address[:port[:port]]
+if [[ $server =~ ^server\.([0-9]+)[[:space:]]*\=[[:space:]]*([^: 
\#]+) ]]; then
+id=${BASH_REMATCH[1]}
+address=${BASH_REMATCH[2]}
--- End diff --

`address` seems to be unused


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411379#comment-16411379
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176725740
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 1
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+# create the masters file (only one currently).
+# This must have all the masters to be used in HA.
+echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+# then move on to create the flink-conf.yaml
+
+if [ -e $TEST_DATA_DIR/recovery ]; then
--- End diff --

add a comment


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411389#comment-16411389
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176732649
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
--- End diff --

What is `PASS` being used for?


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411385#comment-16411385
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176732031
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha.sh ---
@@ -0,0 +1,108 @@
+#!/usr/bin/env bash
+

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+
+#FLINK_DIR=/Users/kkloudas/repos/dataartisans/flink/build-target 
flink-end-to-end-tests/test-scripts/test_ha.sh

+TEST_PROGRAM_JAR=$TEST_INFRA_DIR/../../flink-examples/flink-examples-streaming/target/StateMachineExample.jar\
 --error-rate\ 0.0\ --sleep\ 2
+
+stop_cluster_and_watchdog() {
+kill ${watchdogPid} 2> /dev/null
+wait ${watchdogPid} 2> /dev/null
+
+stop_ha_cluster
+}
+
+verify_logs() {
+expectedRetries=$1
+
+# verify that we have no alerts
+if ! [ `cat ${output} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the StateMachineExample with 0.0 
error rate."
+PASS=""
+fi
+
+# checks that all apart from the first JM recover the failes jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' Recovered 
SubmittedJobGraph "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${expectedRetries} ]; then
+echo "FAILURE: A JM did not take over."
+PASS=""
+fi
+
+# search the logs for JMs that log completed checkpoints
+if ! [ `grep -r --include '*standalonesession*.log' Completed 
checkpoint "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
$((expectedRetries + 1)) ]; then
+echo "FAILURE: A JM did not execute the job."
+PASS=""
+fi
+}
+
+run_ha_test() {
+parallelism=$1
+backend=$2
+async=$3
+incremental=$4
+maxAttempts=$5
+rstrtInterval=$6
+output=$7
+
+jmKillAndRetries=2
+checkpointDir="${TEST_DATA_DIR}/checkpoints/"
+
+# start the cluster on HA mode and
+# verify that all JMs are running
+start_ha_cluster
+
+echo "Running on HA mode: parallelism=${parallelism}, 
backend=${backend}, asyncSnapshots=${async}, and 
incremSnapshots=${incremental}."
+
+# submit a job in detached mode and let it run
+$FLINK_DIR/bin/flink run -d -p ${parallelism} \
+ $TEST_PROGRAM_JAR \
+--stateBackend ${backend} \
+--checkpointDir "file://${checkpointDir}" \
+--asyncCheckpoints ${async} \
+--incrementalCheckpoints ${incremental} \
+--restartAttempts ${maxAttempts} \
+--restartDelay ${rstrtInterval} \
+--output ${output} > /dev/null
+
+# start the watchdog that keeps the number of JMs stable
+jm_watchdog 1 "8081" &
+watchdogPid=$!
+
+# let the job run for a while to take some checkpoints
+sleep 50
+
+for (( c=0; c<${jmKillAndRetries}; c++ )); do
+# kill the JM and wait for watchdog to
+# create a new JM which will take over
+kill_jm 0
+sleep 50
+done
+
+verify_logs ${jmKillAndRetries}
+
+# kill the cluster and zookeeper
+stop_cluster_and_watchdog
+}
+
+run_ha_test 1 "file" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "rocks" "false" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "file" "true" "false" 3 100 "${TEST_DATA_DIR}/output.txt"
+run_ha_test 1 "rocks" "false" "true" 3 100 "${TEST_DATA_DIR}/output.txt"
+trap stop_cluster_and_watchdog EXIT
--- End diff --

I don't have much experience 

[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411382#comment-16411382
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176727554
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -59,6 +146,57 @@ function start_cluster {
   done
 }
 
+function jm_watchdog() {
+expectedJms=$1
+ipPort=$2
+
+while true; do
+runningJms=`jps | grep -o 'StandaloneSessionClusterEntrypoint' | 
wc -l`;
+missingJms=$((expectedJms-runningJms))
+for (( c=0; c End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8973) End-to-end test: Run general purpose job with failures in standalone mode

2018-03-23 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8973?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16411381#comment-16411381
 ] 

ASF GitHub Bot commented on FLINK-8973:
---

Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5750#discussion_r176725915
  
--- Diff: flink-end-to-end-tests/test-scripts/common.sh ---
@@ -39,6 +39,93 @@ cd $TEST_ROOT
 export TEST_DATA_DIR=$TEST_INFRA_DIR/temp-test-directory-$(date +%S%N)
 echo "TEST_DATA_DIR: $TEST_DATA_DIR"
 
+function revert_default_config() {
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 1
+parallelism.default: 1
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function create_ha_conf() {
+
+# create the masters file (only one currently).
+# This must have all the masters to be used in HA.
+echo "localhost:8081" > ${FLINK_DIR}/conf/masters
+
+# then move on to create the flink-conf.yaml
+
+if [ -e $TEST_DATA_DIR/recovery ]; then
+   echo "File ${TEST_DATA_DIR}/recovery exists. Deleting it..."
+   rm -rf $TEST_DATA_DIR/recovery
+fi
+
+sed 's/^//g' > ${FLINK_DIR}/conf/flink-conf.yaml << EOL
+
#==
+# Common
+
#==
+
+jobmanager.rpc.address: localhost
+jobmanager.rpc.port: 6123
+jobmanager.heap.mb: 1024
+taskmanager.heap.mb: 1024
+taskmanager.numberOfTaskSlots: 4
+parallelism.default: 1
+
+
#==
+# High Availability
+
#==
+
+high-availability: zookeeper
+high-availability.zookeeper.storageDir: 
file://${TEST_DATA_DIR}/recovery/
+high-availability.zookeeper.quorum: localhost:2181
+high-availability.zookeeper.path.root: /flink
+high-availability.cluster-id: /test_cluster_one
+
+
#==
+# Web Frontend
+
#==
+
+web.port: 8081
+EOL
+}
+
+function start_ha_cluster {
+echo "Setting up HA Cluster..."
+create_ha_conf
+start_local_zk
+start_cluster
+}
+
+function start_local_zk {
--- End diff --

add a few comments to this method?


> End-to-end test: Run general purpose job with failures in standalone mode
> -
>
> Key: FLINK-8973
> URL: https://issues.apache.org/jira/browse/FLINK-8973
> Project: Flink
>  Issue Type: Sub-task
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Kostas Kloudas
>Priority: Blocker
> Fix For: 1.5.0
>
>
> We should set up an end-to-end test which runs the general purpose job 
> (FLINK-8971) in a standalone setting with HA enabled (ZooKeeper). When 
> running the job, the job failures should be activated. 
> Additionally, we should randomly kill Flink processes (cluster entrypoint and 
> TaskExecutors). When killing them, we should also spawn new processes to make 
> up for the loss.
> This end-to-end test case should run with all different state backend 
> settings: {{RocksDB}} (full/incremental, async/sync), {{FsStateBackend}} 
> (sync/async)
> We should then verify that the general purpose job is successfully recovered 
> without data loss or other failures.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


  1   2   3   >