[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366685#comment-16366685
 ] 

ASF GitHub Bot commented on FLINK-3655:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5415
  
Will merge this.


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.5.0
>
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5415: [FLINK-3655] [core] Support multiple paths in FileInputFo...

2018-02-15 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5415
  
Will merge this.


---


[jira] [Updated] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-15 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-8667:

Description: 
[~aljoscha] [~pnowojski]  

Since KeyedBroadcastProcessFunction is about to get out of the door, I think it 
will be great to expose the timer's key in KeyedBroadcastProcessFunction too. 
If we don't do it now, it will be much more difficult to add the feature on 
later because of user app compatibility issue.

  was:
[~aljoscha] [~pnowojski]  Since KeyedBroadcastProcessFunction is about to get 
out of the door, I think it will be great to expose the timer's key in 
KeyedBroadcastProcessFunction too.




> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8625) Move OutputFlusher thread to Netty scheduled executor

2018-02-15 Thread mingleizhang (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8625?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366531#comment-16366531
 ] 

mingleizhang commented on FLINK-8625:
-

Hi, [~pnowojski] One thing I would like to confirm. We want to replace the 
original way which in every instance of {{StreamRecordWriter}} there will be 
create a thread to do the flush work by scheduling task with {{EventLoop}} ?

> Move OutputFlusher thread to Netty scheduled executor
> -
>
> Key: FLINK-8625
> URL: https://issues.apache.org/jira/browse/FLINK-8625
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: Piotr Nowojski
>Priority: Major
>
> This will allow us to trigger/schedule next flush only if we are not 
> currently busy. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8335) Upgrade hbase connector dependency to 1.4.1

2018-02-15 Thread mingleizhang (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

mingleizhang updated FLINK-8335:

Summary: Upgrade hbase connector dependency to 1.4.1  (was: Upgrade hbase 
connector dependency to 1.4.0)

> Upgrade hbase connector dependency to 1.4.1
> ---
>
> Key: FLINK-8335
> URL: https://issues.apache.org/jira/browse/FLINK-8335
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Priority: Minor
>
> hbase 1.4.1 has been released.
> 1.4.0 shows speed improvement over previous 1.x releases.
> http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available
> This issue is to upgrade the dependency to 1.4.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366517#comment-16366517
 ] 

ASF GitHub Bot commented on FLINK-8335:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5488#discussion_r168659459
  
--- Diff: flink-connectors/flink-hbase/pom.xml ---
@@ -34,7 +34,7 @@ under the License.
jar
 

-   1.3.1
+   1.4.0
--- End diff --

Thanks @greghogan It is a typo. Will fix.


> Upgrade hbase connector dependency to 1.4.0
> ---
>
> Key: FLINK-8335
> URL: https://issues.apache.org/jira/browse/FLINK-8335
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Priority: Minor
>
> hbase 1.4.1 has been released.
> 1.4.0 shows speed improvement over previous 1.x releases.
> http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available
> This issue is to upgrade the dependency to 1.4.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5488: [FLINK-8335] [hbase] Upgrade hbase connector depen...

2018-02-15 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5488#discussion_r168659459
  
--- Diff: flink-connectors/flink-hbase/pom.xml ---
@@ -34,7 +34,7 @@ under the License.
jar
 

-   1.3.1
+   1.4.0
--- End diff --

Thanks @greghogan It is a typo. Will fix.


---


[jira] [Commented] (FLINK-8648) Allow for customization of emitRecordAndUpdateState in Kinesis connector

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366345#comment-16366345
 ] 

ASF GitHub Bot commented on FLINK-8648:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai see 
https://gist.github.com/tweise/7fad5d5df0abf54670a52d0d02d61179 for details.

As indicated in the email thread, emit override will track watermark state 
and delegate the record to the base implementation.


> Allow for customization of emitRecordAndUpdateState in Kinesis connector
> 
>
> Key: FLINK-8648
> URL: https://issues.apache.org/jira/browse/FLINK-8648
> Project: Flink
>  Issue Type: Task
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>
> It should be possible to override the method to intercept the emit behavior, 
> in this case for the purpose of custom watermark support.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5480
  
@tzulitai see 
https://gist.github.com/tweise/7fad5d5df0abf54670a52d0d02d61179 for details.

As indicated in the email thread, emit override will track watermark state 
and delegate the record to the base implementation.


---


[GitHub] flink issue #5393: [FLINK-8516] Allow for custom hash function for shard to ...

2018-02-15 Thread tweise
Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai thanks for the review!


---


[jira] [Commented] (FLINK-8516) Allow custom shard-to-subtask assignment for the FlinkKinesisConsumer

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366303#comment-16366303
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user tweise commented on the issue:

https://github.com/apache/flink/pull/5393
  
@tzulitai thanks for the review!


> Allow custom shard-to-subtask assignment for the FlinkKinesisConsumer
> -
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5497: Propose fixing some typos

2018-02-15 Thread jeis2497052
Github user jeis2497052 commented on the issue:

https://github.com/apache/flink/pull/5497
  
Sorry about the space issue.  Shall I revert that doc?

On Thu, Feb 15, 2018 at 4:27 PM Chesnay  wrote:

> *@zentol* commented on this pull request.
> --
>
> In docs/dev/migration.md
> :
>
> > @@ -145,16 +145,16 @@ public class BufferingSink implements 
SinkFunction>,
>  bufferedElements.add(value);
>  if (bufferedElements.size() == threshold) {
>  for (Tuple2 element: bufferedElements) {
> - // send it to the sink
> - }
> - bufferedElements.clear();
> - }
> +// send it to the sink
> +}
> +bufferedElements.clear();
>
> the indentation here doesn't look correct
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> ,
> or mute the thread
> 

> .
>
-- 
John
508.635.7384
www.linkedin.com/in/johneismeier/



---


[GitHub] flink pull request #5497: Propose fixing some typos

2018-02-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5497#discussion_r168616567
  
--- Diff: docs/dev/migration.md ---
@@ -145,16 +145,16 @@ public class BufferingSink implements 
SinkFunction>,
 bufferedElements.add(value);
 if (bufferedElements.size() == threshold) {
 for (Tuple2 element: bufferedElements) {
-   // send it to the sink
-   }
-   bufferedElements.clear();
-   }
+// send it to the sink
+}
+bufferedElements.clear();
--- End diff --

Tabs only indent seven spaces after the +/- markers.


---


[GitHub] flink pull request #5497: Propose fixing some typos

2018-02-15 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/5497#discussion_r168613102
  
--- Diff: docs/dev/migration.md ---
@@ -145,16 +145,16 @@ public class BufferingSink implements 
SinkFunction>,
 bufferedElements.add(value);
 if (bufferedElements.size() == threshold) {
 for (Tuple2 element: bufferedElements) {
-   // send it to the sink
-   }
-   bufferedElements.clear();
-   }
+// send it to the sink
+}
+bufferedElements.clear();
--- End diff --

the indentation here doesn't look correct


---


[GitHub] flink issue #5497: Propose fixing some typos

2018-02-15 Thread greghogan
Github user greghogan commented on the issue:

https://github.com/apache/flink/pull/5497
  
+1


---


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366232#comment-16366232
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @aljoscha  @pnowojski 


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  Since KeyedBroadcastProcessFunction is about to get 
> out of the door, I think it will be great to expose the timer's key in 
> KeyedBroadcastProcessFunction too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-15 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
cc @aljoscha  @pnowojski 


---


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366231#comment-16366231
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5500

[FLINK-8667] expose key in KeyedBroadcastProcessFunction#onTimer()

## What is the purpose of the change

Expose key in `KeyedBroadcastProcessFunction#onTimer(OnTimerContext)`.

Hopefully this PR can make it into 1.5.0, in which 
release`KeyedBroadcastProcessFunction` will be out. Otherwise, we need to think 
about compatibility and it will be much harder to expose key in 
`KeyedBroadcastProcessFunction#onTimer()`.

## Brief change log

Expose key in `KeyedBroadcastProcessFunction#onTimer(OnTimerContext)`

## Verifying this change

This change is already covered by existing tests, such as 
*BroadcastStateITCase*.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8667

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5500


commit 2dcdedf072576e7e23fd220abf48c32c8355e5cc
Author: Bowen Li 
Date:   2018-02-15T20:37:44Z

[FLINK-8667] expose key in KeyedBroadcastProcessFunction#onTimer()




> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  Since KeyedBroadcastProcessFunction is about to get 
> out of the door, I think it will be great to expose the timer's key in 
> KeyedBroadcastProcessFunction too.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFu...

2018-02-15 Thread bowenli86
GitHub user bowenli86 opened a pull request:

https://github.com/apache/flink/pull/5500

[FLINK-8667] expose key in KeyedBroadcastProcessFunction#onTimer()

## What is the purpose of the change

Expose key in `KeyedBroadcastProcessFunction#onTimer(OnTimerContext)`.

Hopefully this PR can make it into 1.5.0, in which 
release`KeyedBroadcastProcessFunction` will be out. Otherwise, we need to think 
about compatibility and it will be much harder to expose key in 
`KeyedBroadcastProcessFunction#onTimer()`.

## Brief change log

Expose key in `KeyedBroadcastProcessFunction#onTimer(OnTimerContext)`

## Verifying this change

This change is already covered by existing tests, such as 
*BroadcastStateITCase*.

## Does this pull request potentially affect one of the following parts:

  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/bowenli86/flink FLINK-8667

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5500.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5500


commit 2dcdedf072576e7e23fd220abf48c32c8355e5cc
Author: Bowen Li 
Date:   2018-02-15T20:37:44Z

[FLINK-8667] expose key in KeyedBroadcastProcessFunction#onTimer()




---


[jira] [Created] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-15 Thread Bowen Li (JIRA)
Bowen Li created FLINK-8667:
---

 Summary: expose key in KeyedBroadcastProcessFunction#onTimer()
 Key: FLINK-8667
 URL: https://issues.apache.org/jira/browse/FLINK-8667
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.5.0
Reporter: Bowen Li
Assignee: Bowen Li
 Fix For: 1.5.0


[~aljoscha] [~pnowojski]  Since KeyedBroadcastProcessFunction is about to get 
out of the door, I think it will be great to expose the timer's key in 
KeyedBroadcastProcessFunction too.





--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8560) add KeyedProcessFunction to expose the key in onTimer() and other methods

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366201#comment-16366201
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@aljoscha @pnowojski  Guys, quick question. I'm about to develop 
`KeyedProcessFunction` and its operator in a keyed stream. But I found there's 
already a `KeyedProcessOperator` which is for `ProcessFunction` in a keyed 
stream. Shall I create a new operator named something like 
`KeyedProcessFunctionOperator`?

Thanks,
Bowen


> add KeyedProcessFunction to expose the key in onTimer() and other methods
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5481: [FLINK-8560] Access to the current key in ProcessFunction...

2018-02-15 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5481
  
@aljoscha @pnowojski  Guys, quick question. I'm about to develop 
`KeyedProcessFunction` and its operator in a keyed stream. But I found there's 
already a `KeyedProcessOperator` which is for `ProcessFunction` in a keyed 
stream. Shall I create a new operator named something like 
`KeyedProcessFunctionOperator`?

Thanks,
Bowen


---


[jira] [Commented] (FLINK-8335) Upgrade hbase connector dependency to 1.4.0

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366194#comment-16366194
 ] 

ASF GitHub Bot commented on FLINK-8335:
---

Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5488#discussion_r168592172
  
--- Diff: flink-connectors/flink-hbase/pom.xml ---
@@ -34,7 +34,7 @@ under the License.
jar
 

-   1.3.1
+   1.4.0
--- End diff --

1.4.1?


> Upgrade hbase connector dependency to 1.4.0
> ---
>
> Key: FLINK-8335
> URL: https://issues.apache.org/jira/browse/FLINK-8335
> Project: Flink
>  Issue Type: Improvement
>  Components: Batch Connectors and Input/Output Formats
>Reporter: Ted Yu
>Priority: Minor
>
> hbase 1.4.1 has been released.
> 1.4.0 shows speed improvement over previous 1.x releases.
> http://search-hadoop.com/m/HBase/YGbbBxedD1Mnm8t?subj=Re+VOTE+The+second+HBase+1+4+0+release+candidate+RC1+is+available
> This issue is to upgrade the dependency to 1.4.1



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5488: [FLINK-8335] [hbase] Upgrade hbase connector depen...

2018-02-15 Thread greghogan
Github user greghogan commented on a diff in the pull request:

https://github.com/apache/flink/pull/5488#discussion_r168592172
  
--- Diff: flink-connectors/flink-hbase/pom.xml ---
@@ -34,7 +34,7 @@ under the License.
jar
 

-   1.3.1
+   1.4.0
--- End diff --

1.4.1?


---


[jira] [Updated] (FLINK-8630) Add proper support for JSON formats

2018-02-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther updated FLINK-8630:

Summary: Add proper support for JSON formats  (was: To support JSON schema 
to TypeInformation conversion )

> Add proper support for JSON formats
> ---
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8630) Add proper support for JSON formats

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366144#comment-16366144
 ] 

ASF GitHub Bot commented on FLINK-8630:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5491


> Add proper support for JSON formats
> ---
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8630) Add proper support for JSON formats

2018-02-15 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8630.
-
Resolution: Fixed

Fixed in 1.5.0: 6c09c057846fecd456bfbe3dacabb4800370

> Add proper support for JSON formats
> ---
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5491: [FLINK-8630] [table] To support JSON schema to Typ...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5491


---


[jira] [Updated] (FLINK-8640) Disable japicmp on java 9

2018-02-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8640:
--
Description: 
The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
[https://github.com/siom79/japicmp/issues/177|https://github.com/siom79/japicmp/issues/177].

It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
of our maven build, hence we should disable the plugin.

  was:
The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
[https://github.com/siom79/japicmp/issues/177|https://github.com/siom79/japicmp/issues/177.]

It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
of our maven build, hence we should disable the plugin.


> Disable japicmp on java 9
> -
>
> Key: FLINK-8640
> URL: https://issues.apache.org/jira/browse/FLINK-8640
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
> [https://github.com/siom79/japicmp/issues/177|https://github.com/siom79/japicmp/issues/177].
> It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
> of our maven build, hence we should disable the plugin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8666) Use AkkaUtils#testDispatcherConfig in MiniCluster

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366117#comment-16366117
 ] 

ASF GitHub Bot commented on FLINK-8666:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5499

[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster

## What is the purpose of the change

Using the AkkaUtils#testDispatcherConfig reduces the number of started 
threads.
This effectively decreases the resource foot print of the MiniCluster.

This PR is based on #5498.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink nonBlockingShutdown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5499


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

commit 7911549b935ab91b5b762db621966bc0c252501c
Author: Till Rohrmann 
Date:   2018-02-15T16:58:35Z

[hotfix] Remove unused method 
MiniCluster#waitUntilTaskManagerRegistrationsComplete

commit c18d71bacd4194e7d6d4501226a2ed29f042ec5e
Author: Till Rohrmann 
Date:   2018-02-15T17:14:55Z

[hotfix] Don't fail LeaderContender and Listener when closing 
EmbeddedLeaderService

commit 090b4d7d5537df6300f7a552542f4a9d2a3c79ae
Author: Till Rohrmann 
Date:   2018-02-15T17:22:21Z

[hotfix] Fix checkstyle violations in RpcEndpoint

commit 81c57923052f15bf848d0fc803ec5ef2cb548398
Author: Till Rohrmann 
Date:   2018-02-15T18:19:48Z

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

commit 2030818187fda4e5f97eb93c64cfb21dd53e
Author: Till Rohrmann 
Date:   2018-02-15T17:07:49Z

[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster

Using the AkkaUtils#testDispatcherConfig reduces the number of started 
threads.
This effectively decreases the resource foot print of the MiniCluster.




> Use AkkaUtils#testDispatcherConfig in MiniCluster
> -
>
> Key: FLINK-8666
> URL: https://issues.apache.org/jira/browse/FLINK-8666
> Project: Flink
>  Issue Type: Improvement
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> By using the {{AkkaUtils#testDispatcherConfig}} for the {{AkkaRpcServices}} 
> used by the {{MiniCluster}}, we can drastically reduce the number of started 
> threads. This will improve the resource foot print of the {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5499: [FLINK-8666] [test] Use testDispatcherConfig in Mi...

2018-02-15 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5499

[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster

## What is the purpose of the change

Using the AkkaUtils#testDispatcherConfig reduces the number of started 
threads.
This effectively decreases the resource foot print of the MiniCluster.

This PR is based on #5498.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink nonBlockingShutdown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5499.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5499


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

commit 7911549b935ab91b5b762db621966bc0c252501c
Author: Till Rohrmann 
Date:   2018-02-15T16:58:35Z

[hotfix] Remove unused method 
MiniCluster#waitUntilTaskManagerRegistrationsComplete

commit c18d71bacd4194e7d6d4501226a2ed29f042ec5e
Author: Till Rohrmann 
Date:   2018-02-15T17:14:55Z

[hotfix] Don't fail LeaderContender and Listener when closing 
EmbeddedLeaderService

commit 090b4d7d5537df6300f7a552542f4a9d2a3c79ae
Author: Till Rohrmann 
Date:   2018-02-15T17:22:21Z

[hotfix] Fix checkstyle violations in RpcEndpoint

commit 81c57923052f15bf848d0fc803ec5ef2cb548398
Author: Till Rohrmann 
Date:   2018-02-15T18:19:48Z

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

commit 2030818187fda4e5f97eb93c64cfb21dd53e
Author: Till Rohrmann 
Date:   2018-02-15T17:07:49Z

[FLINK-8666] [test] Use testDispatcherConfig in MiniCluster

Using the AkkaUtils#testDispatcherConfig reduces the number of started 
threads.
This effectively decreases the resource foot print of the MiniCluster.




---


[jira] [Created] (FLINK-8666) Use AkkaUtils#testDispatcherConfig in MiniCluster

2018-02-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8666:


 Summary: Use AkkaUtils#testDispatcherConfig in MiniCluster
 Key: FLINK-8666
 URL: https://issues.apache.org/jira/browse/FLINK-8666
 Project: Flink
  Issue Type: Improvement
  Components: Tests
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


By using the {{AkkaUtils#testDispatcherConfig}} for the {{AkkaRpcServices}} 
used by the {{MiniCluster}}, we can drastically reduce the number of started 
threads. This will improve the resource foot print of the {{MiniCluster}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8628) BucketingSink does not work with S3

2018-02-15 Thread dejan miljkovic (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366112#comment-16366112
 ] 

dejan miljkovic commented on FLINK-8628:


Sorry can not reproduce the issue. Lost the pom.xml that was producing this 
problem.

I am still not able to write to S3. Getting different error. Interesting thing 
is that it works from InteliJ but produces below error when executed on local 
cluster.

 
javax.xml.parsers.FactoryConfigurationError: Provider for class 
javax.xml.parsers.DocumentBuilderFactory cannot be created
at 
javax.xml.parsers.FactoryFinder.findServiceProvider(FactoryFinder.java:311)
at javax.xml.parsers.FactoryFinder.find(FactoryFinder.java:267)
at 
javax.xml.parsers.DocumentBuilderFactory.newInstance(DocumentBuilderFactory.java:120)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:2567)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:2543)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2426)
at 
org.apache.flink.fs.s3hadoop.shaded.org.apache.hadoop.conf.Configuration.get(Configuration.java:1240)
at 
org.apache.flink.fs.s3hadoop.S3FileSystemFactory.create(S3FileSystemFactory.java:98)
at 
org.apache.flink.core.fs.FileSystem.getUnguardedFileSystem(FileSystem.java:397)
at org.apache.flink.core.fs.FileSystem.get(FileSystem.java:320)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.createHadoopFileSystem(BucketingSink.java:1125)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initFileSystem(BucketingSink.java:411)
at 
org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:355)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
at 
org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
at 
org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:253)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
at java.lang.Thread.run(Thread.java:748)

> BucketingSink does not work with S3
> ---
>
> Key: FLINK-8628
> URL: https://issues.apache.org/jira/browse/FLINK-8628
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem, Streaming
>Affects Versions: 1.4.0
>Reporter: dejan miljkovic
>Priority: Blocker
> Fix For: 1.5.0
>
>
> BucketingSink does not work wit S3. Followed instructions provided on 
> [https://ci.apache.org/projects/flink/flink-docs-release-1.4/ops/deployment/aws.html]
> but got below exception. Several people are complaining on the same issue.
> [http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCADAFrT9T6WQa25HXR1z1NaL=n8wP9s7aSXxZWxHy=hubo93...@mail.gmail.com%3E]
> [https://lists.apache.org/thread.html/%3CCADAFrT9T6WQa25HXR1z1NaL=n8wP9s7aSXxZWxHy=hubo93...@mail.gmail.com%3E]
> [http://mail-archives.apache.org/mod_mbox/flink-user/201801.mbox/%3CCADAFrT-i+vGe64e__=-dnu4pmpxhvyzvkfqzrhgxbeyhnwa...@mail.gmail.com%3E]
> I don't see any specific bug related to this.
>  
> java.lang.RuntimeException: Error while creating FileSystem when initializing 
> the state of the BucketingSink.
>   at 
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink.initializeState(BucketingSink.java:358)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.tryRestoreFunction(StreamingFunctionUtils.java:178)
>   at 
> org.apache.flink.streaming.util.functions.StreamingFunctionUtils.restoreFunctionState(StreamingFunctionUtils.java:160)
>   at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.initializeState(AbstractUdfStreamOperator.java:96)
>   at 
> org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:259)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeOperators(StreamTask.java:694)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.initializeState(StreamTask.java:682)
>   at 
> 

[jira] [Commented] (FLINK-8665) Allow RpcEndpoint#postStop to complete asynchronously

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8665?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366110#comment-16366110
 ] 

ASF GitHub Bot commented on FLINK-8665:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5498

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

## What is the purpose of the change

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

This PR is based on #5496.

## Verifying this change

This change is already covered by existing tests, such as 
`AkkaRpcActorTest#testActorTerminationWithAsynchronousPostStopAction`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink postStopFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5498


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

commit 7911549b935ab91b5b762db621966bc0c252501c
Author: Till Rohrmann 
Date:   2018-02-15T16:58:35Z

[hotfix] Remove unused method 
MiniCluster#waitUntilTaskManagerRegistrationsComplete

commit c18d71bacd4194e7d6d4501226a2ed29f042ec5e
Author: Till Rohrmann 
Date:   2018-02-15T17:14:55Z

[hotfix] Don't fail LeaderContender and Listener when closing 
EmbeddedLeaderService

commit 090b4d7d5537df6300f7a552542f4a9d2a3c79ae
Author: Till Rohrmann 
Date:   2018-02-15T17:22:21Z

[hotfix] Fix checkstyle violations in RpcEndpoint

commit 81c57923052f15bf848d0fc803ec5ef2cb548398
Author: Till Rohrmann 
Date:   2018-02-15T18:19:48Z

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.




> Allow RpcEndpoint#postStop to complete asynchronously
> -
>
> Key: FLINK-8665
> URL: https://issues.apache.org/jira/browse/FLINK-8665
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Every {{RpcEndpoint}} should have the possibility to trigger asynchronous 
> clean up operations in its {{RpcEndpoint#postStop}} method. In order to do 
> that the {{postStop}} method should return a {{CompletableFuture}} 
> which is completed once all post stop actions have finished. The 
> {{RpcEndpoint#terminationFuture}} will only be completed afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5498: [FLINK-8665] [rest] Let RpcEndpoint#postStop retur...

2018-02-15 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5498

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

## What is the purpose of the change

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

This PR is based on #5496.

## Verifying this change

This change is already covered by existing tests, such as 
`AkkaRpcActorTest#testActorTerminationWithAsynchronousPostStopAction`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink postStopFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5498.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5498


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

commit 7911549b935ab91b5b762db621966bc0c252501c
Author: Till Rohrmann 
Date:   2018-02-15T16:58:35Z

[hotfix] Remove unused method 
MiniCluster#waitUntilTaskManagerRegistrationsComplete

commit c18d71bacd4194e7d6d4501226a2ed29f042ec5e
Author: Till Rohrmann 
Date:   2018-02-15T17:14:55Z

[hotfix] Don't fail LeaderContender and Listener when closing 
EmbeddedLeaderService

commit 090b4d7d5537df6300f7a552542f4a9d2a3c79ae
Author: Till Rohrmann 
Date:   2018-02-15T17:22:21Z

[hotfix] Fix checkstyle violations in RpcEndpoint

commit 81c57923052f15bf848d0fc803ec5ef2cb548398
Author: Till Rohrmann 
Date:   2018-02-15T18:19:48Z

[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future

The RpcEndpoint#postStop method returns a CompletableFuture which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.




---


[GitHub] flink pull request #5497: Propose fixing some typos

2018-02-15 Thread jeis2497052
GitHub user jeis2497052 opened a pull request:

https://github.com/apache/flink/pull/5497

Propose fixing some typos

Just a doc change if this helps?

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jeis2497052/flink master

Alternatively you can review and apply these changes as the patch at:


[jira] [Commented] (FLINK-8630) To support JSON schema to TypeInformation conversion

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366101#comment-16366101
 ] 

ASF GitHub Bot commented on FLINK-8630:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5491
  
Merging...


> To support JSON schema to TypeInformation conversion 
> -
>
> Key: FLINK-8630
> URL: https://issues.apache.org/jira/browse/FLINK-8630
> Project: Flink
>  Issue Type: Sub-task
>  Components: Streaming Connectors
>Reporter: Xingcan Cui
>Assignee: Timo Walther
>Priority: Blocker
> Fix For: 1.5.0
>
>
> To support (FLINK-8558), we need to generate a \{{TypeInformation}} from a 
> standard [JSON schema|http://json-schema.org/] (and maybe vice verse). There 
> are some problems to be discussed, e.g., how to handle JSON {{Number}} type. 
> One option would be we always return a specific type, which can be configured 
> to be double or BigDecimal, for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5491: [FLINK-8630] [table] To support JSON schema to TypeInform...

2018-02-15 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5491
  
Merging...


---


[jira] [Created] (FLINK-8665) Allow RpcEndpoint#postStop to complete asynchronously

2018-02-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8665:


 Summary: Allow RpcEndpoint#postStop to complete asynchronously
 Key: FLINK-8665
 URL: https://issues.apache.org/jira/browse/FLINK-8665
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


Every {{RpcEndpoint}} should have the possibility to trigger asynchronous clean 
up operations in its {{RpcEndpoint#postStop}} method. In order to do that the 
{{postStop}} method should return a {{CompletableFuture}} which is 
completed once all post stop actions have finished. The 
{{RpcEndpoint#terminationFuture}} will only be completed afterwards.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8664) Change termination future type of RpcEndpoint to Void

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8664?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366087#comment-16366087
 ] 

ASF GitHub Bot commented on FLINK-8664:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5496

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

## What is the purpose of the change

Change `RpcEndpoint#TerminationFuture` value type to `Void`.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink voidTerminationFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5496


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void




> Change termination future type of RpcEndpoint to Void
> -
>
> Key: FLINK-8664
> URL: https://issues.apache.org/jira/browse/FLINK-8664
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to align the termination futures of {{RpcService}} and 
> {{RpcEndpoint}} we should change the future value type of the 
> {{RcpEnpoint#getTerminationFuture()}} to {{Void}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5496: [FLINK-8664] [rest] Change RpcEndpoint#Termination...

2018-02-15 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5496

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void

## What is the purpose of the change

Change `RpcEndpoint#TerminationFuture` value type to `Void`.

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink voidTerminationFuture

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5496.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5496


commit 1a077c7bb7d6240e86171dbbbd77e43668a7fe6c
Author: Till Rohrmann 
Date:   2018-02-15T17:43:39Z

[FLINK-8664] [rest] Change RpcEndpoint#TerminationFuture value type to Void




---


[jira] [Created] (FLINK-8664) Change termination future type of RpcEndpoint to Void

2018-02-15 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-8664:


 Summary: Change termination future type of RpcEndpoint to Void
 Key: FLINK-8664
 URL: https://issues.apache.org/jira/browse/FLINK-8664
 Project: Flink
  Issue Type: Sub-task
  Components: Distributed Coordination
Affects Versions: 1.5.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
 Fix For: 1.5.0


In order to align the termination futures of {{RpcService}} and {{RpcEndpoint}} 
we should change the future value type of the 
{{RcpEnpoint#getTerminationFuture()}} to {{Void}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8516) Allow custom shard-to-subtask assignment for the FlinkKinesisConsumer

2018-02-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8516:
---
Summary: Allow custom shard-to-subtask assignment for the 
FlinkKinesisConsumer  (was: FlinkKinesisConsumer does not balance shards over 
subtasks)

> Allow custom shard-to-subtask assignment for the FlinkKinesisConsumer
> -
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai resolved FLINK-8516.

   Resolution: Fixed
Fix Version/s: 1.5.0

Thanks for your contribution [~thw]!

Merged to master via 942649e933f3106d923c567c705d75fc9aa0a40c.

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8516:
---
Affects Version/s: (was: 1.5.0)
   (was: 1.3.2)
   (was: 1.4.0)

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366056#comment-16366056
 ] 

ASF GitHub Bot commented on FLINK-8516:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5393


> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8516) FlinkKinesisConsumer does not balance shards over subtasks

2018-02-15 Thread Tzu-Li (Gordon) Tai (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8516?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tzu-Li (Gordon) Tai updated FLINK-8516:
---
Issue Type: New Feature  (was: Bug)

> FlinkKinesisConsumer does not balance shards over subtasks
> --
>
> Key: FLINK-8516
> URL: https://issues.apache.org/jira/browse/FLINK-8516
> Project: Flink
>  Issue Type: New Feature
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Major
> Fix For: 1.5.0
>
>
> The hash code of the shard is used to distribute discovered shards over 
> subtasks round robin. This works as long as shard identifiers are sequential. 
> After shards are rebalanced in Kinesis, that may no longer be the case and 
> the distribution become skewed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5393: [FLINK-8516] Allow for custom hash function for sh...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5393


---


[GitHub] flink issue #5480: [FLINK-8648] [kinesis] Allow for customization of emitRec...

2018-02-15 Thread tzulitai
Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5480
  
Just curious, before I proceed to merge this:
could you briefly describe what your custom override would consist of?
I'm asking because we may be able to find a cleaner solution (or not 😅). 
The method was made final, because that part is critical for exactly-once 
guarantees.

It would also be interesting to get to know what you are trying to achieve 
here.


---


[jira] [Commented] (FLINK-8648) Allow for customization of emitRecordAndUpdateState in Kinesis connector

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8648?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16366052#comment-16366052
 ] 

ASF GitHub Bot commented on FLINK-8648:
---

Github user tzulitai commented on the issue:

https://github.com/apache/flink/pull/5480
  
Just curious, before I proceed to merge this:
could you briefly describe what your custom override would consist of?
I'm asking because we may be able to find a cleaner solution (or not ). 
The method was made final, because that part is critical for exactly-once 
guarantees.

It would also be interesting to get to know what you are trying to achieve 
here.


> Allow for customization of emitRecordAndUpdateState in Kinesis connector
> 
>
> Key: FLINK-8648
> URL: https://issues.apache.org/jira/browse/FLINK-8648
> Project: Flink
>  Issue Type: Task
>  Components: Kinesis Connector
>Reporter: Thomas Weise
>Assignee: Thomas Weise
>Priority: Minor
>
> It should be possible to override the method to intercept the emit behavior, 
> in this case for the purpose of custom watermark support.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8640) Disable japicmp on java 9

2018-02-15 Thread Greg Hogan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8640?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Greg Hogan updated FLINK-8640:
--
Description: 
The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
[https://github.com/siom79/japicmp/issues/177|https://github.com/siom79/japicmp/issues/177.]

It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
of our maven build, hence we should disable the plugin.

  was:
The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
[https://github.com/siom79/japicmp/issues/177.]

It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
of our maven build, hence we should disable the plugin.


> Disable japicmp on java 9
> -
>
> Key: FLINK-8640
> URL: https://issues.apache.org/jira/browse/FLINK-8640
> Project: Flink
>  Issue Type: Sub-task
>  Components: Build System
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
>
> The {{japicmp}} plugin does not work out-of-the-box with java 9 as per 
> [https://github.com/siom79/japicmp/issues/177|https://github.com/siom79/japicmp/issues/177.]
> It is necessary to modify MAVEN_OPTS which we cannot automatically do as part 
> of our maven build, hence we should disable the plugin.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365958#comment-16365958
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168545164
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Disposable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowingSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This class implements the logic that creates (and potentially restores) 
a state backend. The restore logic
+ * considers multiple, prioritized options of snapshots to restore from, 
where all of the options should recreate
+ * the same state for the backend. When we fail to restore from the 
snapshot with the highest priority (typically
+ * the "fastest" to restore), we fallback to the next snapshot with the 
next highest priority. We also take care
+ * of cleaning up from failed restore attempts. We only reattempt when the 
problem occurs during the restore call
+ * and will only stop after all snapshot alternatives are exhausted and 
all failed.
+ *
+ * @param  type of the restored backend.
+ * @param  type of the supplied snapshots from which the backend 
restores.
+ */
+public class BackendRestorerProcedure<
+   T extends Closeable & Disposable & Snapshotable>,
+   S extends StateObject> {
+
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(BackendRestorerProcedure.class);
+
+   /** Factory for new, fresh backends without state. */
+   private final ThrowingSupplier instanceSupplier;
+
+   /** This registry is used so that recovery can participate in the task 
lifecycle, i.e. can be canceled. */
+   private final CloseableRegistry backendCloseableRegistry;
+
+   /**
+* Creates a new backend restorer using the given backend supplier and 
the closeable registry.
+*
+* @param instanceSupplier factory function for new, empty backend 
instances.
+* @param backendCloseableRegistry registry to allow participation in 
task lifecycle, e.g. react to cancel.
+*/
+   public BackendRestorerProcedure(
+   @Nonnull ThrowingSupplier instanceSupplier,
+   @Nonnull CloseableRegistry backendCloseableRegistry) {
+
+   this.instanceSupplier = 
Preconditions.checkNotNull(instanceSupplier);
+   this.backendCloseableRegistry = 
Preconditions.checkNotNull(backendCloseableRegistry);
+   }
+
+   /**
+* Creates a new state backend and restores it from the provided set of 
state snapshot alternatives.
+*
+* @param restoreOptions iterator over a prioritized set of state 
snapshot alternatives for recovery.
+* @return the created (and restored) state backend.
+* @throws Exception if the backend could not be created or restored.
+*/
+   public @Nonnull T createAndRestore(@Nonnull 
Iterator restoreOptions) throws Exception {
+
+   StateObjectCollection restoreState = null;
+
+   boolean retry;
+
+   while (true) {
--- 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168545164
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/BackendRestorerProcedure.java
 ---
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators;
+
+import org.apache.flink.core.fs.CloseableRegistry;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.state.Snapshotable;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.Disposable;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.ThrowingSupplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnull;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Iterator;
+
+/**
+ * This class implements the logic that creates (and potentially restores) 
a state backend. The restore logic
+ * considers multiple, prioritized options of snapshots to restore from, 
where all of the options should recreate
+ * the same state for the backend. When we fail to restore from the 
snapshot with the highest priority (typically
+ * the "fastest" to restore), we fallback to the next snapshot with the 
next highest priority. We also take care
+ * of cleaning up from failed restore attempts. We only reattempt when the 
problem occurs during the restore call
+ * and will only stop after all snapshot alternatives are exhausted and 
all failed.
+ *
+ * @param  type of the restored backend.
+ * @param  type of the supplied snapshots from which the backend 
restores.
+ */
+public class BackendRestorerProcedure<
+   T extends Closeable & Disposable & Snapshotable>,
+   S extends StateObject> {
+
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(BackendRestorerProcedure.class);
+
+   /** Factory for new, fresh backends without state. */
+   private final ThrowingSupplier instanceSupplier;
+
+   /** This registry is used so that recovery can participate in the task 
lifecycle, i.e. can be canceled. */
+   private final CloseableRegistry backendCloseableRegistry;
+
+   /**
+* Creates a new backend restorer using the given backend supplier and 
the closeable registry.
+*
+* @param instanceSupplier factory function for new, empty backend 
instances.
+* @param backendCloseableRegistry registry to allow participation in 
task lifecycle, e.g. react to cancel.
+*/
+   public BackendRestorerProcedure(
+   @Nonnull ThrowingSupplier instanceSupplier,
+   @Nonnull CloseableRegistry backendCloseableRegistry) {
+
+   this.instanceSupplier = 
Preconditions.checkNotNull(instanceSupplier);
+   this.backendCloseableRegistry = 
Preconditions.checkNotNull(backendCloseableRegistry);
+   }
+
+   /**
+* Creates a new state backend and restores it from the provided set of 
state snapshot alternatives.
+*
+* @param restoreOptions iterator over a prioritized set of state 
snapshot alternatives for recovery.
+* @return the created (and restored) state backend.
+* @throws Exception if the backend could not be created or restored.
+*/
+   public @Nonnull T createAndRestore(@Nonnull 
Iterator restoreOptions) throws Exception {
+
+   StateObjectCollection restoreState = null;
+
+   boolean retry;
+
+   while (true) {
--- End diff --

It is not quiet that easy, because the restore method must always be called 
at least once and also the log message might differ between iterations (the 
"will retry" part). But I agree it could be simplified.


---


[jira] [Commented] (FLINK-8653) Remove slot request timeout from SlotPool

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365896#comment-16365896
 ] 

ASF GitHub Bot commented on FLINK-8653:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5483
  
Rebased onto the soon to be master. Merging once Travis gives green light.


> Remove slot request timeout from SlotPool
> -
>
> Key: FLINK-8653
> URL: https://issues.apache.org/jira/browse/FLINK-8653
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After addressing FLINK-8643, we can further simplify the {{SlotPool}} by 
> replacing the internal slot request timeout by the timeout given to 
> {{SlotPool#allocateSlot}}. Since this request will timeout on the 
> {{ProviderAndOwner}} side anyway, we should do the same on the {{SlotPool}} 
> side.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5483: [FLINK-8653] [flip6] Remove internal slot request timeout...

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5483
  
Rebased onto the soon to be master. Merging once Travis gives green light.


---


[jira] [Commented] (FLINK-8614) Enable Flip-6 per default

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365889#comment-16365889
 ] 

ASF GitHub Bot commented on FLINK-8614:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5437
  
Thanks for the review @GJL. Merging once Travis gives green light.


> Enable Flip-6 per default
> -
>
> Key: FLINK-8614
> URL: https://issues.apache.org/jira/browse/FLINK-8614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
> setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5437: [FLINK-8614] [flip6] Activate Flip-6 mode per default

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5437
  
Thanks for the review @GJL. Merging once Travis gives green light.


---


[jira] [Commented] (FLINK-8610) Remove RestfulGateway from JobMasterGateway

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365875#comment-16365875
 ] 

ASF GitHub Bot commented on FLINK-8610:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5433


> Remove RestfulGateway from JobMasterGateway
> ---
>
> Key: FLINK-8610
> URL: https://issues.apache.org/jira/browse/FLINK-8610
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the 
> {{RestfulGateway}}. Therefore, we should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8611) Add result future to JobManagerRunner

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365876#comment-16365876
 ] 

ASF GitHub Bot commented on FLINK-8611:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5434


> Add result future to JobManagerRunner
> -
>
> Key: FLINK-8611
> URL: https://issues.apache.org/jira/browse/FLINK-8611
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Adding a {{CompletableFuture}} result future to the 
> {{JobManagerRunner}} will allow to return a {{JobResult}} future for an still 
> running job. This is helpful for the implementation of a non-detached job 
> mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8647) Introduce JobMasterConfiguration

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8647?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365874#comment-16365874
 ] 

ASF GitHub Bot commented on FLINK-8647:
---

Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/5478


> Introduce JobMasterConfiguration
> 
>
> Key: FLINK-8647
> URL: https://issues.apache.org/jira/browse/FLINK-8647
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{JobMaster}} already contains some configuration settings which we pass 
> as constructor arguments. In order to make it better maintainable, I suggest 
> to add a {{JobMasterConfiguration}} object similar to the 
> {{TaskManagerConfiguration}}. This object will contain all {{JobMaster}} 
> specific configuration settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5434: [FLINK-8611] [flip6] Add result future to JobManag...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5434


---


[GitHub] flink pull request #5478: [FLINK-8647] [flip6] Introduce JobMasterConfigurat...

2018-02-15 Thread tillrohrmann
Github user tillrohrmann closed the pull request at:

https://github.com/apache/flink/pull/5478


---


[GitHub] flink pull request #5433: [FLINK-8610] [flip6] Remove RestfulGateway from Jo...

2018-02-15 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5433


---


[jira] [Closed] (FLINK-8647) Introduce JobMasterConfiguration

2018-02-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8647?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8647.

Resolution: Fixed

Fixed via 0f5c96bf16aee78b025b1bdecd7377f2681311d9

> Introduce JobMasterConfiguration
> 
>
> Key: FLINK-8647
> URL: https://issues.apache.org/jira/browse/FLINK-8647
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> The {{JobMaster}} already contains some configuration settings which we pass 
> as constructor arguments. In order to make it better maintainable, I suggest 
> to add a {{JobMasterConfiguration}} object similar to the 
> {{TaskManagerConfiguration}}. This object will contain all {{JobMaster}} 
> specific configuration settings.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8611) Add result future to JobManagerRunner

2018-02-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8611.

Resolution: Fixed

Fixed via 35ee062eab984c65ede3511e66ff13c2bca43770

> Add result future to JobManagerRunner
> -
>
> Key: FLINK-8611
> URL: https://issues.apache.org/jira/browse/FLINK-8611
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Adding a {{CompletableFuture}} result future to the 
> {{JobManagerRunner}} will allow to return a {{JobResult}} future for an still 
> running job. This is helpful for the implementation of a non-detached job 
> mode.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8610) Remove RestfulGateway from JobMasterGateway

2018-02-15 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8610.

Resolution: Fixed

Fixed via a621e2f1308e120bcb149e9877b781062e59842b

> Remove RestfulGateway from JobMasterGateway
> ---
>
> Key: FLINK-8610
> URL: https://issues.apache.org/jira/browse/FLINK-8610
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding FLINK-8608, the {{JobMaster}} no longer needs to implement the 
> {{RestfulGateway}}. Therefore, we should remove it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365848#comment-16365848
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168530422
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
+
+   if (jmTaskStateSnapshot != null) {
+   jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   }
+
+   if (tmTaskStateSnapshot != null) {
+   OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   if (tmOpState != null) {
+   tmStateCollection = 
Collections.singletonList(tmOpState);
+   }
+   }
+
+   if (jmOpState == null) {
+   jmOpState = new OperatorSubtaskState();
+   }
+
+   if (tmStateCollection == null) {
+   tmStateCollection = Collections.emptyList();
+   }
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168530422
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
+
+   if (jmTaskStateSnapshot != null) {
+   jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   }
+
+   if (tmTaskStateSnapshot != null) {
+   OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   if (tmOpState != null) {
+   tmStateCollection = 
Collections.singletonList(tmOpState);
+   }
+   }
+
+   if (jmOpState == null) {
+   jmOpState = new OperatorSubtaskState();
+   }
+
+   if (tmStateCollection == null) {
+   tmStateCollection = Collections.emptyList();
+   }
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365846#comment-16365846
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168529743
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
+
+   if (jmTaskStateSnapshot != null) {
+   jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   }
+
+   if (tmTaskStateSnapshot != null) {
+   OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   if (tmOpState != null) {
+   tmStateCollection = 
Collections.singletonList(tmOpState);
+   }
+   }
+
+   if (jmOpState == null) {
+   jmOpState = new OperatorSubtaskState();
+   }
--- End diff --

Changed in later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168529743
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
+
+   if (jmTaskStateSnapshot != null) {
+   jmOpState = 
jmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   }
+
+   if (tmTaskStateSnapshot != null) {
+   OperatorSubtaskState tmOpState = 
tmTaskStateSnapshot.getSubtaskStateByOperatorID(operatorID);
+   if (tmOpState != null) {
+   tmStateCollection = 
Collections.singletonList(tmOpState);
+   }
+   }
+
+   if (jmOpState == null) {
+   jmOpState = new OperatorSubtaskState();
+   }
--- End diff --

Changed in later commit.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365842#comment-16365842
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168529359
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
--- End diff --

Changed in later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168529359
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TestTaskStateManager.java
 ---
@@ -118,12 +121,38 @@ public void reportTaskStateSnapshots(
}
}
 
+   @Nonnull
@Override
-   public OperatorSubtaskState operatorStates(OperatorID operatorID) {
-   TaskStateSnapshot taskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
-   return taskStateSnapshot != null ? 
taskStateSnapshot.getSubtaskStateByOperatorID(operatorID) : null;
+   public PrioritizedOperatorSubtaskState 
prioritizedOperatorState(OperatorID operatorID) {
+   TaskStateSnapshot jmTaskStateSnapshot = 
getLastJobManagerTaskStateSnapshot();
+   TaskStateSnapshot tmTaskStateSnapshot = 
getLastTaskManagerTaskStateSnapshot();
+
+   OperatorSubtaskState jmOpState = null;
+   List tmStateCollection = null;
--- End diff --

Changed in later commit.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365834#comment-16365834
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528716
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -109,10 +111,10 @@ public void testStateReportingAndRetrieving() {
executionAttemptID,
checkpointResponderMock,
taskRestore);
-
-   Assert.assertTrue(jmOperatorSubtaskState_1 == 
taskStateManager.operatorStates(operatorID_1));
-   Assert.assertTrue(jmOperatorSubtaskState_2 == 
taskStateManager.operatorStates(operatorID_2));
-   
Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
+//TODO
+// Assert.assertTrue(jmOperatorSubtaskState_1 == 
taskStateManager.prioritizedOperatorState(operatorID_1));
+// Assert.assertTrue(jmOperatorSubtaskState_2 == 
taskStateManager.prioritizedOperatorState(operatorID_2));
--- End diff --

Fixed in later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365831#comment-16365831
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
private final JobID jobID;
 
-   /** */
+   /** JobVertexID of the owning subtask. */
private final JobVertexID jobVertexID;
 
-   /** */
+   /** Subtask index of the owning subtask. */
private final int subtaskIndex;
 
-   /** */
-   private final Map 
storedTaskStateByCheckpointID;
-
/** The root directories for all local state of this {@link 
TaskLocalStateStore}. */
private final File[] localStateRootDirectories;
 
+   /** Executor that runs the discarding of released state objects. */
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
public TaskLocalStateStore(
-   JobID jobID,
-   JobVertexID jobVertexID,
-   int subtaskIndex,
-   File[] localStateRootDirectories) {
+   @Nonnull JobID jobID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull File[] localStateRootDirectories,
+   @Nonnull Executor discardExecutor) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
-   this.storedTaskStateByCheckpointID = new HashMap<>();
this.localStateRootDirectories = localStateRootDirectories;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
}
 
+   @Nonnull
protected String createSubtaskPath() {
return jobID + File.separator + jobVertexID + File.separator + 
subtaskIndex;
}
 
+   /**
+* Stores the local state for the given checkpoint id.
+*
+* @param checkpointId id for the checkpoint that created the local 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528716
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -109,10 +111,10 @@ public void testStateReportingAndRetrieving() {
executionAttemptID,
checkpointResponderMock,
taskRestore);
-
-   Assert.assertTrue(jmOperatorSubtaskState_1 == 
taskStateManager.operatorStates(operatorID_1));
-   Assert.assertTrue(jmOperatorSubtaskState_2 == 
taskStateManager.operatorStates(operatorID_2));
-   
Assert.assertNull(taskStateManager.operatorStates(operatorID_3));
+//TODO
+// Assert.assertTrue(jmOperatorSubtaskState_1 == 
taskStateManager.prioritizedOperatorState(operatorID_1));
+// Assert.assertTrue(jmOperatorSubtaskState_2 == 
taskStateManager.prioritizedOperatorState(operatorID_2));
--- End diff --

Fixed in later commit.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528536
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
private final JobID jobID;
 
-   /** */
+   /** JobVertexID of the owning subtask. */
private final JobVertexID jobVertexID;
 
-   /** */
+   /** Subtask index of the owning subtask. */
private final int subtaskIndex;
 
-   /** */
-   private final Map 
storedTaskStateByCheckpointID;
-
/** The root directories for all local state of this {@link 
TaskLocalStateStore}. */
private final File[] localStateRootDirectories;
 
+   /** Executor that runs the discarding of released state objects. */
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
public TaskLocalStateStore(
-   JobID jobID,
-   JobVertexID jobVertexID,
-   int subtaskIndex,
-   File[] localStateRootDirectories) {
+   @Nonnull JobID jobID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull File[] localStateRootDirectories,
+   @Nonnull Executor discardExecutor) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
-   this.storedTaskStateByCheckpointID = new HashMap<>();
this.localStateRootDirectories = localStateRootDirectories;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
}
 
+   @Nonnull
protected String createSubtaskPath() {
return jobID + File.separator + jobVertexID + File.separator + 
subtaskIndex;
}
 
+   /**
+* Stores the local state for the given checkpoint id.
+*
+* @param checkpointId id for the checkpoint that created the local 
state that will be stored.
+* @param localState the local state to store.
+*/
public void storeLocalState(
-   @Nonnull CheckpointMetaData checkpointMetaData,
+   @Nonnegative long 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365829#comment-16365829
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
private final JobID jobID;
 
-   /** */
+   /** JobVertexID of the owning subtask. */
private final JobVertexID jobVertexID;
 
-   /** */
+   /** Subtask index of the owning subtask. */
private final int subtaskIndex;
 
-   /** */
-   private final Map 
storedTaskStateByCheckpointID;
-
/** The root directories for all local state of this {@link 
TaskLocalStateStore}. */
private final File[] localStateRootDirectories;
 
+   /** Executor that runs the discarding of released state objects. */
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
public TaskLocalStateStore(
-   JobID jobID,
-   JobVertexID jobVertexID,
-   int subtaskIndex,
-   File[] localStateRootDirectories) {
+   @Nonnull JobID jobID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull File[] localStateRootDirectories,
+   @Nonnull Executor discardExecutor) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
-   this.storedTaskStateByCheckpointID = new HashMap<>();
this.localStateRootDirectories = localStateRootDirectories;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
}
 
+   @Nonnull
protected String createSubtaskPath() {
return jobID + File.separator + jobVertexID + File.separator + 
subtaskIndex;
}
 
+   /**
+* Stores the local state for the given checkpoint id.
+*
+* @param checkpointId id for the checkpoint that created the local 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168528301
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
+
+   /** JobID from the owning subtask. */
private final JobID jobID;
 
-   /** */
+   /** JobVertexID of the owning subtask. */
private final JobVertexID jobVertexID;
 
-   /** */
+   /** Subtask index of the owning subtask. */
private final int subtaskIndex;
 
-   /** */
-   private final Map 
storedTaskStateByCheckpointID;
-
/** The root directories for all local state of this {@link 
TaskLocalStateStore}. */
private final File[] localStateRootDirectories;
 
+   /** Executor that runs the discarding of released state objects. */
+   private final Executor discardExecutor;
+
+   /** Lock for synchronisation on the storage map and the discarded 
status. */
+   private final Object lock;
+
+   /** Status flag if this store was already discarded. */
+   @GuardedBy("lock")
+   private boolean discarded;
+
+   /** Maps checkpoint ids to local TaskStateSnapshots. */
+   @GuardedBy("lock")
+   private final SortedMap 
storedTaskStateByCheckpointID;
+
public TaskLocalStateStore(
-   JobID jobID,
-   JobVertexID jobVertexID,
-   int subtaskIndex,
-   File[] localStateRootDirectories) {
+   @Nonnull JobID jobID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex,
+   @Nonnull File[] localStateRootDirectories,
+   @Nonnull Executor discardExecutor) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
-   this.storedTaskStateByCheckpointID = new HashMap<>();
this.localStateRootDirectories = localStateRootDirectories;
+   this.discardExecutor = discardExecutor;
+   this.lock = new Object();
+   this.storedTaskStateByCheckpointID = new TreeMap<>();
+   this.discarded = false;
}
 
+   @Nonnull
protected String createSubtaskPath() {
return jobID + File.separator + jobVertexID + File.separator + 
subtaskIndex;
}
 
+   /**
+* Stores the local state for the given checkpoint id.
+*
+* @param checkpointId id for the checkpoint that created the local 
state that will be stored.
+* @param localState the local state to store.
+*/
public void storeLocalState(
-   @Nonnull CheckpointMetaData checkpointMetaData,
+   @Nonnegative long 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365826#comment-16365826
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168527958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -59,58 +71,55 @@ public TaskExecutorLocalStateStoresManager(File[] 
localStateRootDirectories) {
}
}
}
-   }
 
-   public TaskLocalStateStore localStateStoreForTask(
-   JobID jobId,
-   JobVertexID jobVertexID,
-   int subtaskIndex) {
+   }
 
-   Preconditions.checkNotNull(jobId);
-   final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+   public TaskLocalStateStore localStateStoreForSubtask(
+   @Nonnull JobID jobId,
+   @Nonnull AllocationID allocationID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex) {
 
final Map 
taskStateManagers =
-   this.taskStateStoresByJobID.computeIfAbsent(jobId, k -> 
new HashMap<>());
+   
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new 
ConcurrentHashMap<>());
+
+   final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
 
return taskStateManagers.computeIfAbsent(
-   taskKey, k -> new TaskLocalStateStore(jobId, 
jobVertexID, subtaskIndex, localStateRootDirectories));
+   taskKey,
+   k -> new TaskLocalStateStore(jobId, jobVertexID, 
subtaskIndex, localStateRootDirectories, discardExecutor));
}
 
-   public void releaseJob(JobID jobID) {
+   public void releaseLocalStateForAllocationId(@Nonnull AllocationID 
allocationID) {
 
-   Map 
cleanupLocalStores = taskStateStoresByJobID.remove(jobID);
+   Map 
cleanupLocalStores =
+   taskStateStoresByAllocationID.remove(allocationID);
 
if (cleanupLocalStores != null) {
-// doRelease(cleanupLocalStores.values());
+   doRelease(cleanupLocalStores.values());
}
}
 
public void releaseAll() {
 
-   for (Map 
stateStoreMap : taskStateStoresByJobID.values()) {
-// doRelease(stateStoreMap.values());
+   for (Map 
stateStoreMap : taskStateStoresByAllocationID.values()) {
+   doRelease(stateStoreMap.values());
}
 
-   taskStateStoresByJobID.clear();
+   taskStateStoresByAllocationID.clear();
}
 
-   private void doRelease(Iterable toRelease) throws 
Exception {
+   private void doRelease(Iterable toRelease) {
 
if (toRelease != null) {
 
-   Exception collectedExceptions = null;
-
for (TaskLocalStateStore stateStore : toRelease) {
try {
stateStore.dispose();
} catch (Exception disposeEx) {
-   collectedExceptions = 
ExceptionUtils.firstOrSuppressed(disposeEx, collectedExceptions);
+   LOG.warn("Exception while disposing 
local state store", disposeEx);
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168527958
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -59,58 +71,55 @@ public TaskExecutorLocalStateStoresManager(File[] 
localStateRootDirectories) {
}
}
}
-   }
 
-   public TaskLocalStateStore localStateStoreForTask(
-   JobID jobId,
-   JobVertexID jobVertexID,
-   int subtaskIndex) {
+   }
 
-   Preconditions.checkNotNull(jobId);
-   final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
+   public TaskLocalStateStore localStateStoreForSubtask(
+   @Nonnull JobID jobId,
+   @Nonnull AllocationID allocationID,
+   @Nonnull JobVertexID jobVertexID,
+   @Nonnegative int subtaskIndex) {
 
final Map 
taskStateManagers =
-   this.taskStateStoresByJobID.computeIfAbsent(jobId, k -> 
new HashMap<>());
+   
this.taskStateStoresByAllocationID.computeIfAbsent(allocationID, k -> new 
ConcurrentHashMap<>());
+
+   final JobVertexSubtaskKey taskKey = new 
JobVertexSubtaskKey(jobVertexID, subtaskIndex);
 
return taskStateManagers.computeIfAbsent(
-   taskKey, k -> new TaskLocalStateStore(jobId, 
jobVertexID, subtaskIndex, localStateRootDirectories));
+   taskKey,
+   k -> new TaskLocalStateStore(jobId, jobVertexID, 
subtaskIndex, localStateRootDirectories, discardExecutor));
}
 
-   public void releaseJob(JobID jobID) {
+   public void releaseLocalStateForAllocationId(@Nonnull AllocationID 
allocationID) {
 
-   Map 
cleanupLocalStores = taskStateStoresByJobID.remove(jobID);
+   Map 
cleanupLocalStores =
+   taskStateStoresByAllocationID.remove(allocationID);
 
if (cleanupLocalStores != null) {
-// doRelease(cleanupLocalStores.values());
+   doRelease(cleanupLocalStores.values());
}
}
 
public void releaseAll() {
 
-   for (Map 
stateStoreMap : taskStateStoresByJobID.values()) {
-// doRelease(stateStoreMap.values());
+   for (Map 
stateStoreMap : taskStateStoresByAllocationID.values()) {
+   doRelease(stateStoreMap.values());
}
 
-   taskStateStoresByJobID.clear();
+   taskStateStoresByAllocationID.clear();
}
 
-   private void doRelease(Iterable toRelease) throws 
Exception {
+   private void doRelease(Iterable toRelease) {
 
if (toRelease != null) {
 
-   Exception collectedExceptions = null;
-
for (TaskLocalStateStore stateStore : toRelease) {
try {
stateStore.dispose();
} catch (Exception disposeEx) {
-   collectedExceptions = 
ExceptionUtils.firstOrSuppressed(disposeEx, collectedExceptions);
+   LOG.warn("Exception while disposing 
local state store", disposeEx);
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365820#comment-16365820
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168526712
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -20,35 +20,47 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
 import java.io.File;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a 
placeholder.
  */
 public class TaskExecutorLocalStateStoresManager {
 
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
/**
 * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
-* this.
+* this. Maps from allocation id to all the subtask's local state 
stores.
 */
-   private final Map> 
taskStateStoresByJobID;
+   private final Map> taskStateStoresByAllocationID;
--- End diff --

In a later commit, this user a normal `HashMap` and synchronization.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168526712
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManager.java
 ---
@@ -20,35 +20,47 @@
 
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
-import org.apache.flink.util.Preconditions;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 
 import java.io.File;
-import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class holds the all {@link TaskLocalStateStore} objects for a task 
executor (manager).
- *
- * TODO: this still still work in progress and partially still acts as a 
placeholder.
  */
 public class TaskExecutorLocalStateStoresManager {
 
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskExecutorLocalStateStoresManager.class);
+
/**
 * This map holds all local state stores for tasks running on the task 
manager / executor that own the instance of
-* this.
+* this. Maps from allocation id to all the subtask's local state 
stores.
 */
-   private final Map> 
taskStateStoresByJobID;
+   private final Map> taskStateStoresByAllocationID;
--- End diff --

In a later commit, this user a normal `HashMap` and synchronization.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365818#comment-16365818
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168526304
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -75,7 +79,10 @@ public OperatorSubtaskState 
getSubtaskStateByOperatorID(OperatorID operatorID) {
 * Maps the given operator id to the given subtask state. Returns the 
subtask state of a previous mapping, if such
 * a mapping existed or null otherwise.
 */
-   public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID 
operatorID, OperatorSubtaskState state) {
+   public OperatorSubtaskState putSubtaskStateByOperatorID(
--- End diff --

Changed in later commit, it is then actually `Nonnull`.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168526304
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/TaskStateSnapshot.java
 ---
@@ -75,7 +79,10 @@ public OperatorSubtaskState 
getSubtaskStateByOperatorID(OperatorID operatorID) {
 * Maps the given operator id to the given subtask state. Returns the 
subtask state of a previous mapping, if such
 * a mapping existed or null otherwise.
 */
-   public OperatorSubtaskState putSubtaskStateByOperatorID(OperatorID 
operatorID, OperatorSubtaskState state) {
+   public OperatorSubtaskState putSubtaskStateByOperatorID(
--- End diff --

Changed in later commit, it is then actually `Nonnull`.


---


[jira] [Commented] (FLINK-8614) Enable Flip-6 per default

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365813#comment-16365813
 ] 

ASF GitHub Bot commented on FLINK-8614:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5437#discussion_r168525466
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -100,6 +101,18 @@ public MiniDispatcher(
return acknowledgeCompletableFuture;
}
 
+   @Override
+   public CompletableFuture requestJobResult(JobID jobId, Time 
timeout) {
+   final CompletableFuture jobResultFuture = 
super.requestJobResult(jobId, timeout);
+
+   if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
+   // terminate the MiniDispatcher once we served the 
first JobResult successfully
+   jobResultFuture.thenRun(this::shutDown);
--- End diff --

I think you're right. We should also shut down the `MiniDispatcher` in case 
of a failure.


> Enable Flip-6 per default
> -
>
> Key: FLINK-8614
> URL: https://issues.apache.org/jira/browse/FLINK-8614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
> setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5437: [FLINK-8614] [flip6] Activate Flip-6 mode per defa...

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5437#discussion_r168525466
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/MiniDispatcher.java
 ---
@@ -100,6 +101,18 @@ public MiniDispatcher(
return acknowledgeCompletableFuture;
}
 
+   @Override
+   public CompletableFuture requestJobResult(JobID jobId, Time 
timeout) {
+   final CompletableFuture jobResultFuture = 
super.requestJobResult(jobId, timeout);
+
+   if (executionMode == ClusterEntrypoint.ExecutionMode.NORMAL) {
+   // terminate the MiniDispatcher once we served the 
first JobResult successfully
+   jobResultFuture.thenRun(this::shutDown);
--- End diff --

I think you're right. We should also shut down the `MiniDispatcher` in case 
of a failure.


---


[jira] [Commented] (FLINK-8614) Enable Flip-6 per default

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8614?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365812#comment-16365812
 ] 

ASF GitHub Bot commented on FLINK-8614:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5437#discussion_r168525199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -402,6 +403,24 @@ public void start() throws Exception {
}
}
 
+   @Override
+   public CompletableFuture requestJobResult(JobID jobId, Time 
timeout) {
+   final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
+
+   if (jobManagerRunner == null) {
+   final ArchivedExecutionGraph archivedExecutionGraph = 
archivedExecutionGraphStore.get(jobId);
+
+   if (archivedExecutionGraph == null) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
+   }
+   } else {
+   return jobManagerRunner.getResultFuture().thenApply(
--- End diff --

Good catch. Will change it.


> Enable Flip-6 per default
> -
>
> Key: FLINK-8614
> URL: https://issues.apache.org/jira/browse/FLINK-8614
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> After adding the FLINK-8471, the next step is to enable Flip-6 per default by 
> setting the configuration switch to {{flip6}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5437: [FLINK-8614] [flip6] Activate Flip-6 mode per defa...

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5437#discussion_r168525199
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java 
---
@@ -402,6 +403,24 @@ public void start() throws Exception {
}
}
 
+   @Override
+   public CompletableFuture requestJobResult(JobID jobId, Time 
timeout) {
+   final JobManagerRunner jobManagerRunner = 
jobManagerRunners.get(jobId);
+
+   if (jobManagerRunner == null) {
+   final ArchivedExecutionGraph archivedExecutionGraph = 
archivedExecutionGraphStore.get(jobId);
+
+   if (archivedExecutionGraph == null) {
+   return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+   } else {
+   return 
CompletableFuture.completedFuture(JobResult.createFrom(archivedExecutionGraph));
+   }
+   } else {
+   return jobManagerRunner.getResultFuture().thenApply(
--- End diff --

Good catch. Will change it.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365810#comment-16365810
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168524862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -525,19 +526,17 @@ public String toString() {
/**
 * This class encapsulates the configuration for local recovery of this 
backend.
 */
-   public static final class LocalRecoveryConfig implements Serializable {
+   public static final class LocalRecoveryConfig extends 
LocalRecoveryConfigBase {
 
-   private static final long serialVersionUID = 1L;
--- End diff --

changed in later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168524862
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -525,19 +526,17 @@ public String toString() {
/**
 * This class encapsulates the configuration for local recovery of this 
backend.
 */
-   public static final class LocalRecoveryConfig implements Serializable {
+   public static final class LocalRecoveryConfig extends 
LocalRecoveryConfigBase {
 
-   private static final long serialVersionUID = 1L;
--- End diff --

changed in later commit.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168524601
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.filesystem.FixFileFsStateOutputStream;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Interface that provides access to a CheckpointStateOutputStream and a 
method to provide the {@link SnapshotResult}.
+ * This abstracts from different ways that a result is obtained from 
checkpoint output streams.
+ */
+public interface CheckpointStreamWithResultProvider extends Closeable {
+
+   SnapshotResult closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets kgOffs) throws IOException;
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream();
+
+   @Override
+   default void close() throws IOException {
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = getCheckpointOutputStream();
+   if (outputStream != null) {
+   outputStream.close();
+   }
+   }
+
+   /**
+* Implementation of {@link CheckpointStreamWithResultProvider} that 
only creates the
+* primary/remote/jm-owned state.
+*/
+   class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {
+
+   private final 
CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
+
+   public 
PrimaryStreamOnly(CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream) {
+   this.outputStream = 
Preconditions.checkNotNull(outputStream);
+   }
+
+   @Override
+   public SnapshotResult 
closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets kgOffs) throws IOException {
+
+   StreamStateHandle streamStateHandle = 
outputStream.closeAndGetHandle();
+   KeyGroupsStateHandle primaryStateHandle = new 
KeyGroupsStateHandle(kgOffs, streamStateHandle);
+   return new SnapshotResult<>(primaryStateHandle, null);
+   }
+
+   @Override
+   public CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream() {
+   return outputStream;
+   }
+   }
+
+   /**
+* Implementation of {@link CheckpointStreamWithResultProvider} that 
creates both, the
+* primary/remote/jm-owned state and the secondary/local/tm-owned state.
+*/
+   class PrimaryAndSecondaryStream implements 
CheckpointStreamWithResultProvider {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryAndSecondaryStream.class);
+
+   private final DuplicatingCheckpointOutputStream outputStream;
+
+   public 
PrimaryAndSecondaryStream(DuplicatingCheckpointOutputStream outputStream) {
+   this.outputStream = 
Preconditions.checkNotNull(outputStream);
+   }
+
+   @Override
+   public SnapshotResult 
closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets offsets) throws IOException {
+
+   final StreamStateHandle primaryStreamStateHandle;
+
+   try {
+   primaryStreamStateHandle = 
outputStream.closeAndGetPrimaryHandle();
+   } catch (IOException primaryEx) {
+   try {
+   

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365809#comment-16365809
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168524601
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamWithResultProvider.java
 ---
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.core.fs.Path;
+import 
org.apache.flink.runtime.state.filesystem.FixFileFsStateOutputStream;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * Interface that provides access to a CheckpointStateOutputStream and a 
method to provide the {@link SnapshotResult}.
+ * This abstracts from different ways that a result is obtained from 
checkpoint output streams.
+ */
+public interface CheckpointStreamWithResultProvider extends Closeable {
+
+   SnapshotResult closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets kgOffs) throws IOException;
+
+   CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream();
+
+   @Override
+   default void close() throws IOException {
+   CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream = getCheckpointOutputStream();
+   if (outputStream != null) {
+   outputStream.close();
+   }
+   }
+
+   /**
+* Implementation of {@link CheckpointStreamWithResultProvider} that 
only creates the
+* primary/remote/jm-owned state.
+*/
+   class PrimaryStreamOnly implements CheckpointStreamWithResultProvider {
+
+   private final 
CheckpointStreamFactory.CheckpointStateOutputStream outputStream;
+
+   public 
PrimaryStreamOnly(CheckpointStreamFactory.CheckpointStateOutputStream 
outputStream) {
+   this.outputStream = 
Preconditions.checkNotNull(outputStream);
+   }
+
+   @Override
+   public SnapshotResult 
closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets kgOffs) throws IOException {
+
+   StreamStateHandle streamStateHandle = 
outputStream.closeAndGetHandle();
+   KeyGroupsStateHandle primaryStateHandle = new 
KeyGroupsStateHandle(kgOffs, streamStateHandle);
+   return new SnapshotResult<>(primaryStateHandle, null);
+   }
+
+   @Override
+   public CheckpointStreamFactory.CheckpointStateOutputStream 
getCheckpointOutputStream() {
+   return outputStream;
+   }
+   }
+
+   /**
+* Implementation of {@link CheckpointStreamWithResultProvider} that 
creates both, the
+* primary/remote/jm-owned state and the secondary/local/tm-owned state.
+*/
+   class PrimaryAndSecondaryStream implements 
CheckpointStreamWithResultProvider {
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(PrimaryAndSecondaryStream.class);
+
+   private final DuplicatingCheckpointOutputStream outputStream;
+
+   public 
PrimaryAndSecondaryStream(DuplicatingCheckpointOutputStream outputStream) {
+   this.outputStream = 
Preconditions.checkNotNull(outputStream);
+   }
+
+   @Override
+   public SnapshotResult 
closeAndFinalizeCheckpointStreamResult(
+   KeyGroupRangeOffsets offsets) throws IOException {
+
+   final StreamStateHandle primaryStreamStateHandle;
+
+   

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365775#comment-16365775
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168515077
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
--- End diff --

This can be removed without replacement.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365776#comment-16365776
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
 * Disposes the state of all local snapshots managed by this object.
 */
public void dispose() {
+
+   Collection> statesCopy;
+
synchronized (lock) {
-   for (Map.Entry entry : 
storedTaskStateByCheckpointID.entrySet()) {
-   discardStateObject(entry.getValue(), 
entry.getKey());
-   }
discarded = true;
+   statesCopy = new 
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
}
+
+   discardExecutor.execute(() -> {
--- End diff --

I think it would be good for these kind of asynchronous clean up operations 
to return a `CompletableFuture`. This future could then be returned by the 
`dispose` method. The benefit would be that the caller would know when the 
clean up has completed and, thus, would be safe to shut down the 
`discardExecutor`.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365777#comment-16365777
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514963
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -148,22 +150,22 @@ public void 
testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I
taskLocalStateStore);
 
LocalRecoveryDirectoryProvider 
directoryProviderFromTaskLocalStateStore =
-   
taskLocalStateStore.createLocalRecoveryRootDirectoryProvider();
+   
taskLocalStateStore.getLocalRecoveryRootDirectoryProvider();
 
LocalRecoveryDirectoryProvider 
directoryProviderFromTaskStateManager =

taskStateManager.createLocalRecoveryRootDirectoryProvider();
 
 
for (int i = 0; i < 10; ++i) {
Assert.assertEquals(rootDirs[i % 
rootDirs.length],
-   
directoryProviderFromTaskLocalStateStore.nextRootDirectory());
+   
directoryProviderFromTaskLocalStateStore.rootDirectory(i));
Assert.assertEquals(rootDirs[i % 
rootDirs.length],
-   
directoryProviderFromTaskStateManager.nextRootDirectory());
+   
directoryProviderFromTaskStateManager.rootDirectory(i));
}
 
-   Assert.assertEquals(
-   
directoryProviderFromTaskLocalStateStore.getSubtaskSpecificPath(),
-   
directoryProviderFromTaskStateManager.getSubtaskSpecificPath());
+// Assert.assertEquals(
+// 
directoryProviderFromTaskLocalStateStore.getSubtaskSpecificPath(),
+// 
directoryProviderFromTaskStateManager.getSubtaskSpecificPath());
--- End diff --

Why did you comment this part out? Can it be removed?


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514168
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -166,20 +190,38 @@ public TaskStateSnapshot retrieveLocalState(long 
checkpointID) {
 * Disposes the state of all local snapshots managed by this object.
 */
public void dispose() {
+
+   Collection> statesCopy;
+
synchronized (lock) {
-   for (Map.Entry entry : 
storedTaskStateByCheckpointID.entrySet()) {
-   discardStateObject(entry.getValue(), 
entry.getKey());
-   }
discarded = true;
+   statesCopy = new 
ArrayList<>(storedTaskStateByCheckpointID.entrySet());
}
+
+   discardExecutor.execute(() -> {
--- End diff --

I think it would be good for these kind of asynchronous clean up operations 
to return a `CompletableFuture`. This future could then be returned by the 
`dispose` method. The benefit would be that the caller would know when the 
clean up has completed and, thus, would be safe to shut down the 
`discardExecutor`.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514963
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -148,22 +150,22 @@ public void 
testForwardingSubtaskLocalStateBaseDirFromLocalStateStore() throws I
taskLocalStateStore);
 
LocalRecoveryDirectoryProvider 
directoryProviderFromTaskLocalStateStore =
-   
taskLocalStateStore.createLocalRecoveryRootDirectoryProvider();
+   
taskLocalStateStore.getLocalRecoveryRootDirectoryProvider();
 
LocalRecoveryDirectoryProvider 
directoryProviderFromTaskStateManager =

taskStateManager.createLocalRecoveryRootDirectoryProvider();
 
 
for (int i = 0; i < 10; ++i) {
Assert.assertEquals(rootDirs[i % 
rootDirs.length],
-   
directoryProviderFromTaskLocalStateStore.nextRootDirectory());
+   
directoryProviderFromTaskLocalStateStore.rootDirectory(i));
Assert.assertEquals(rootDirs[i % 
rootDirs.length],
-   
directoryProviderFromTaskStateManager.nextRootDirectory());
+   
directoryProviderFromTaskStateManager.rootDirectory(i));
}
 
-   Assert.assertEquals(
-   
directoryProviderFromTaskLocalStateStore.getSubtaskSpecificPath(),
-   
directoryProviderFromTaskStateManager.getSubtaskSpecificPath());
+// Assert.assertEquals(
+// 
directoryProviderFromTaskLocalStateStore.getSubtaskSpecificPath(),
+// 
directoryProviderFromTaskStateManager.getSubtaskSpecificPath());
--- End diff --

Why did you comment this part out? Can it be removed?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-15 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16365774#comment-16365774
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514913
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168515077
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
--- End diff --

This can be removed without replacement.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-15 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168514913
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -19,92 +19,224 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
-import org.apache.flink.util.ExceptionUtils;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nonnegative;
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
+import javax.annotation.concurrent.GuardedBy;
 
 import java.io.File;
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.Iterator;
 import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.Executor;
 
 /**
  * This class will service as a task-manager-level local storage for local 
checkpointed state. The purpose is to provide
  * access to a state that is stored locally for a faster recovery compared 
to the state that is stored remotely in a
  * stable store DFS. For now, this storage is only complementary to the 
stable storage and local state is typically
  * lost in case of machine failures. In such cases (and others), client 
code of this class must fall back to using the
  * slower but highly available store.
- *
- * TODO this is currently a placeholder / mock that still must be 
implemented!
  */
 public class TaskLocalStateStore {
 
-   /** */
+   /** Logger for this class. */
+   private static final Logger LOG = 
LoggerFactory.getLogger(TaskLocalStateStore.class);
+
+   /** Maximum number of retained snapshots. */
+   private static final int MAX_RETAINED_SNAPSHOTS = 5;
+
+   /** Dummy value to use instead of null to satisfy {@link 
ConcurrentHashMap}. */
+   private final TaskStateSnapshot NULL_DUMMY = new TaskStateSnapshot();
--- End diff --

👍 


---


  1   2   3   4   >