[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-14 Thread tarun razdan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tarun razdan updated FLINK-7756:

Attachment: taskmanager_without_cassandra.log

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> 

[GitHub] flink pull request #5481: [FLINK-8560] Access to the current key in ProcessF...

2018-02-14 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r168122242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -162,7 +162,7 @@ class GroupAggProcessFunction(
 
   override def onTimer(
   timestamp: Long,
-  ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+  ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
--- End diff --

I'm not a scala expert, but is this change somehow related to adding the 
`getCurrentKey()` method? 


---


[jira] [Commented] (FLINK-8560) Access to the current key in ProcessFunction#OnTimerContext after keyBy()

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8560?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363704#comment-16363704
 ] 

ASF GitHub Bot commented on FLINK-8560:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/5481#discussion_r168122242
  
--- Diff: 
flink-libraries/flink-table/src/main/scala/org/apache/flink/table/runtime/aggregate/GroupAggProcessFunction.scala
 ---
@@ -162,7 +162,7 @@ class GroupAggProcessFunction(
 
   override def onTimer(
   timestamp: Long,
-  ctx: ProcessFunction[CRow, CRow]#OnTimerContext,
+  ctx: ProcessFunction[CRow, CRow]#OnTimerContext[_],
--- End diff --

I'm not a scala expert, but is this change somehow related to adding the 
`getCurrentKey()` method? 


> Access to the current key in ProcessFunction#OnTimerContext after keyBy()
> -
>
> Key: FLINK-8560
> URL: https://issues.apache.org/jira/browse/FLINK-8560
> Project: Flink
>  Issue Type: Improvement
>  Components: DataStream API
>Affects Versions: 1.4.0
>Reporter: Jürgen Thomann
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently it is required to store the key of a keyBy() in the processElement 
> method to have access to it in the OnTimerContext.
> This is not so good as you have to check in the processElement method for 
> every element if the key is already stored and set it if it's not already set.
> A possible solution would adding OnTimerContext#getCurrentKey() or a similar 
> method. Maybe having it in the open() method could maybe work as well.
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Getting-Key-from-keyBy-in-ProcessFunction-tt18126.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8576) Log message for QueryableState loading failure too verbose

2018-02-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8576:

Fix Version/s: 1.4.2

> Log message for QueryableState loading failure too verbose
> --
>
> Key: FLINK-8576
> URL: https://issues.apache.org/jira/browse/FLINK-8576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> Whenever a job- or taskmanager is started it attempts to load the queryable 
> state via reflection. If this fails due to the classes not being in the 
> classpath (which is common and the default path) we log the full stacktrace 
> as DEBUG.
> We should reduce this to a single line as it get's really verbose when 
> sifting through debug logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8576) Log message for QueryableState loading failure too verbose

2018-02-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8576:

Affects Version/s: 1.4.0

> Log message for QueryableState loading failure too verbose
> --
>
> Key: FLINK-8576
> URL: https://issues.apache.org/jira/browse/FLINK-8576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> Whenever a job- or taskmanager is started it attempts to load the queryable 
> state via reflection. If this fails due to the classes not being in the 
> classpath (which is common and the default path) we log the full stacktrace 
> as DEBUG.
> We should reduce this to a single line as it get's really verbose when 
> sifting through debug logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363819#comment-16363819
 ] 

Aljoscha Krettek commented on FLINK-7756:
-

There is only one TaskManager, correct?

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> 

[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363818#comment-16363818
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5469
  
merging.


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5469: [FLINK-8475][config][docs] Integrate Core options

2018-02-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5469
  
merging.


---


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363878#comment-16363878
 ] 

ASF GitHub Bot commented on FLINK-8212:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5458
  
@tillrohrmann I updated the code. Could you take a look ? Thanks ~


> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5458: [FLINK-8212] [network] Pull EnvironmentInformation out of...

2018-02-14 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5458
  
@tillrohrmann I updated the code. Could you take a look ? Thanks ~


---


[jira] [Closed] (FLINK-5779) Auto generate configuration docs

2018-02-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5779.
---
Resolution: Fixed

> Auto generate configuration docs
> 
>
> Key: FLINK-5779
> URL: https://issues.apache.org/jira/browse/FLINK-5779
> Project: Flink
>  Issue Type: Improvement
>  Components: Configuration, Documentation
>Reporter: Ufuk Celebi
>Assignee: Dawid Wysakowicz
>Priority: Major
> Fix For: 1.5.0
>
>
> As per discussion on the mailing list we need to improve on the configuration 
> documentation page 
> (http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/Discuss-Organizing-Documentation-for-Configuration-Options-td15773.html).
> We decided to try to (semi) automate this in order to not get of sync in the 
> future.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363895#comment-16363895
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5467


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363897#comment-16363897
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5470


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363894#comment-16363894
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5459


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363896#comment-16363896
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5468


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8475.
---
Resolution: Fixed

Fully addressed on master in:
8c8033eb6665d7e352fed4d067930c91b9507d82
528cca5cf68ea27eeed3553fa18d85b69a93a8ff
29dcec5cc44f4dd9b0124d9c0a15d31a92ab30f6
be2b911f2c75b21bd2d8f64cb5fb0ec3d2ad6426
39657d925653926c5d68b574479e22c3cc3929ef
4bbe2dc146d0e24ceb9f6a6ae829c88962173a9d
57ec03ed3dc6ce3fbb563d3274b032945ba53510
07bd44b92a10229a21ebac519a936a3e520a164d

> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363898#comment-16363898
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5471


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8553) switch flink-metrics-datadog to async mode

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363900#comment-16363900
 ] 

ASF GitHub Bot commented on FLINK-8553:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5418


> switch flink-metrics-datadog to async mode
> --
>
> Key: FLINK-8553
> URL: https://issues.apache.org/jira/browse/FLINK-8553
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Even though currently flink-metrics-datadog is designed as `fire-and-forget`, 
> it's still using sync calls which may block or slow down core. Need to switch 
> it to async mode.
> cc  [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8576) Log message for QueryableState loading failure too verbose

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363899#comment-16363899
 ] 

ASF GitHub Bot commented on FLINK-8576:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5420


> Log message for QueryableState loading failure too verbose
> --
>
> Key: FLINK-8576
> URL: https://issues.apache.org/jira/browse/FLINK-8576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0, 1.4.2
>
>
> Whenever a job- or taskmanager is started it attempts to load the queryable 
> state via reflection. If this fails due to the classes not being in the 
> classpath (which is common and the default path) we log the full stacktrace 
> as DEBUG.
> We should reduce this to a single line as it get's really verbose when 
> sifting through debug logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363901#comment-16363901
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5469


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5415: [FLINK-3655] [core] Support multiple paths in FileInputFo...

2018-02-14 Thread fhueske
Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5415
  
Thanks for the review @zentol. 

I've addressed your feedback, improved the backwards compatibility as 
discussed offline, and added multi-path support to additional input formats.


---


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363907#comment-16363907
 ] 

ASF GitHub Bot commented on FLINK-3655:
---

Github user fhueske commented on the issue:

https://github.com/apache/flink/pull/5415
  
Thanks for the review @zentol. 

I've addressed your feedback, improved the backwards compatibility as 
discussed offline, and added multi-path support to additional input formats.


> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.5.0
>
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168418
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -316,6 +316,11 @@
 */
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
 
+   /**
+* The config parameter defining the root directories for storing 
file-based state for local recovery.
+*/
+   public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = 
"taskmanager.local-state-root.dir";
--- End diff --

👍 


---


[jira] [Commented] (FLINK-3655) Allow comma-separated or multiple directories to be specified for FileInputFormat

2018-02-14 Thread Fabian Hueske (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-3655?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363950#comment-16363950
 ] 

Fabian Hueske commented on FLINK-3655:
--

Hi [~sjwiesman],

I've reworked the PR and opened a new one: 
https://github.com/apache/flink/pull/5415

Unfortunately, we cannot magically enable this feature for all input formats 
that are based on {{FileInputFormat}}, because it is a {{@Public}} interface.
With the changes that I proposed in the PR, we enable multipath support for the 
CsvInputFormats, AvroInputFormat, OrcRowInputFormat, and TextInputFormat. All 
other classes would have to override the {{supportsMultiPaths()}} method.

Can you check if the changes in the PR would address your use case?
It would be great if you could provide feedback soon because the feature freeze 
for Flink 1.5.0 will happen in a few days.

Thank you, Fabian

> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat
> -
>
> Key: FLINK-3655
> URL: https://issues.apache.org/jira/browse/FLINK-3655
> Project: Flink
>  Issue Type: Improvement
>  Components: Core
>Affects Versions: 1.0.0
>Reporter: Gna Phetsarath
>Assignee: Fabian Hueske
>Priority: Major
>  Labels: starter
> Fix For: 1.5.0
>
>
> Allow comma-separated or multiple directories to be specified for 
> FileInputFormat so that a DataSource will process the directories 
> sequentially.
>
> env.readFile("/data/2016/01/01/*/*,/data/2016/01/02/*/*,/data/2016/01/03/*/*")
> in Scala
>env.readFile(paths: Seq[String])
> or 
>   env.readFile(path: String, otherPaths: String*)
> Wildcard support would be a bonus.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168381
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -46,26 +52,63 @@
/** */
private final int subtaskIndex;
 
+   /** */
+   private final Map 
storedTaskStateByCheckpointID;
+
+   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
+   private final File subtaskLocalStateBaseDirectory;
+
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
-   int subtaskIndex) {
+   int subtaskIndex,
+   File localStateRootDir) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
+   this.storedTaskStateByCheckpointID = new HashMap<>();
+   this.subtaskLocalStateBaseDirectory =
+   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
+   }
+
+   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
+   return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
}
 
public void storeLocalState(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nullable TaskStateSnapshot localState) {
 
-   if (localState != null) {
-   throw new UnsupportedOperationException("Implement this 
before actually providing local state!");
+   TaskStateSnapshot previous =
+   
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), 
localState);
+
+   if (previous != null) {
+   throw new IllegalStateException("Found previously 
registered local state for checkpoint with id " +
+   checkpointMetaData.getCheckpointId() + "! This 
indicated a problem.");
}
}
 
-   public void dispose() {
-   //TODO
+   public void dispose() throws Exception {
+
+   Exception collectedException = null;
+
+   for (TaskStateSnapshot snapshot : 
storedTaskStateByCheckpointID.values()) {
+   try {
+   snapshot.discardState();
+   } catch (Exception discardEx) {
+   collectedException = 
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
+   }
+   }
+
+   if (collectedException != null) {
+   throw collectedException;
+   }
+
+   
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
--- End diff --

This already works different in later commits.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364042#comment-16364042
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168176306
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -501,4 +529,53 @@ public String toString() {
"', asynchronous: " + asynchronousSnapshots +
", fileStateThreshold: " + fileStateThreshold + 
")";
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
+   return localStateDirectory;
--- End diff --

All this is changed a bit later, bit in essence the backend simply does not 
ask for the directory if nothing file-based is used.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8459) Implement cancelWithSavepoint in RestClusterClient

2018-02-14 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364044#comment-16364044
 ] 

Till Rohrmann commented on FLINK-8459:
--

Yes the better solution in terms of guarantees would be issuing a single call 
and then execute the logic on the {{JobMaster}}. However, this won't strictly 
give you exactly once processing guarantees, because we don't support yet a 
proper cancel with savepoint command on the {{JobMaster}}. After the savepoint 
the {{Tasks}} might still process some other records before cancelling it. 
Therefore, both variants, doing the cancel with savepoint command with a single 
REST call or splitting them into the two existing REST calls 
{{triggerSavepoint}} and {{cancel}} would be ok for me.

> Implement cancelWithSavepoint in RestClusterClient
> --
>
> Key: FLINK-8459
> URL: https://issues.apache.org/jira/browse/FLINK-8459
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client
>Affects Versions: 1.5.0
>Reporter: Gary Yao
>Assignee: vinoyang
>Priority: Blocker
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Implement the method
> {{RestClusterClient#cancelWithSavepoint(JobID jobId, @Nullable String 
> savepointDirectory)}}.
> by either taking a savepoint and cancel the job separately, or by migrating 
> the logic in {{JobCancellationWithSavepointHandlers}}. The former will have 
> different semantics because the checkpoint scheduler is not stopped. Thus it 
> is not guaranteed that there won't be additional checkpoints between the 
> savepoint and the job cancelation.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8520) CassandraConnectorITCase.testCassandraTableSink unstable on Travis

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364066#comment-16364066
 ] 

ASF GitHub Bot commented on FLINK-8520:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5474
  
yes it should be 8520 (good catch!), will fix while merging.


> CassandraConnectorITCase.testCassandraTableSink unstable on Travis
> --
>
> Key: FLINK-8520
> URL: https://issues.apache.org/jira/browse/FLINK-8520
> Project: Flink
>  Issue Type: Bug
>  Components: Cassandra Connector, Table API  SQL, Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Chesnay Schepler
>Priority: Blocker
>  Labels: test-stability
> Fix For: 1.5.0, 1.4.2
>
>
> The {{CassandraConnectorITCase.testCassandraTableSink}} fails on Travis with 
> a timeout.
>  
> https://travis-ci.org/tillrohrmann/flink/jobs/333711342



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5474: [FLINK-8520][cassandra] Fix race condition

2018-02-14 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5474
  
yes it should be 8520 (good catch!), will fix while merging.


---


[jira] [Commented] (FLINK-8600) BucketingSink errors out when used with Presto filesystem

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8600?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364062#comment-16364062
 ] 

ASF GitHub Bot commented on FLINK-8600:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5486
  
(I realised this doesn't add any tests or anything but `BucketingSink` is 
currently problematic with S3 anyways.)


> BucketingSink errors out when used with Presto filesystem
> -
>
> Key: FLINK-8600
> URL: https://issues.apache.org/jira/browse/FLINK-8600
> Project: Flink
>  Issue Type: Bug
>  Components: FileSystem
>Affects Versions: 1.4.0
>Reporter: Narayanan Arunachalam
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0
>
>
> BucketingSink passes a non-qualified path when attempting to test the 
> truncate behavior as you can see 
> [here|https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-filesystem/src/main/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSink.java#L606].
> Because of this "Path is not absolute: uuid" error is thrown when used with 
> the 
> [PrestoS3FileSystem|https://github.com/prestodb/presto/blob/master/presto-hive/src/main/java/com/facebook/presto/hive/s3/PrestoS3FileSystem.java]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364074#comment-16364074
 ] 

ASF GitHub Bot commented on FLINK-8212:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5458#discussion_r168180756
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -270,10 +273,14 @@ public static TaskManagerServices fromConfiguration(
 * Creates a {@link MemoryManager} from the given {@link 
TaskManagerServicesConfiguration}.
 *
 * @param taskManagerServicesConfiguration to create the memory manager 
from
+* @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
+* @param maxJvmHeapMemory the maximum JVM heap size
 * @return Memory manager
 * @throws Exception
 */
-   private static MemoryManager 
createMemoryManager(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) throws Exception {
+   private static MemoryManager 
createMemoryManager(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
+   
 long freeHeapMemoryWithDefrag,
+   
 long maxJvmHeapMemory) throws Exception {
--- End diff --

Will fix ~


> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8212) Pull EnvironmentInformation out of TaskManagerServices

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8212?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364076#comment-16364076
 ] 

ASF GitHub Bot commented on FLINK-8212:
---

Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5458#discussion_r168180788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -530,10 +538,12 @@ public static long calculateNetworkBufferMemory(long 
totalJavaMemorySize, Config
 * .
 *
 * @param tmConfig task manager services configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size
 *
 * @return memory to use for network buffers (in bytes)
 */
-   public static long 
calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) {
+   public static long 
calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig,
+   
long maxJvmHeapMemory) {
--- End diff --

Will fix ~ 


> Pull EnvironmentInformation out of TaskManagerServices
> --
>
> Key: FLINK-8212
> URL: https://issues.apache.org/jira/browse/FLINK-8212
> Project: Flink
>  Issue Type: Improvement
>  Components: Local Runtime, Network
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: mingleizhang
>Priority: Major
> Fix For: 1.5.0
>
>
> We should pull the {{EnvironmentInformation}} out of the 
> {{TaskManagerServices}} where it is used to get access to the memory settings 
> of the executing JVM. This unnecessarily couples the former with the latter 
> and makes testing extremely hard (one has to use {{PowerMockRunner}} and mock 
> the static {{EnvironmentInformation}}).
> When addressing this issue, then we should also refactor 
> {{NetworkBufferCalculationTest}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5458: [FLINK-8212] [network] Pull EnvironmentInformation...

2018-02-14 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5458#discussion_r168180788
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -530,10 +538,12 @@ public static long calculateNetworkBufferMemory(long 
totalJavaMemorySize, Config
 * .
 *
 * @param tmConfig task manager services configuration object
+* @param maxJvmHeapMemory the maximum JVM heap size
 *
 * @return memory to use for network buffers (in bytes)
 */
-   public static long 
calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig) {
+   public static long 
calculateNetworkBufferMemory(TaskManagerServicesConfiguration tmConfig,
+   
long maxJvmHeapMemory) {
--- End diff --

Will fix ~ 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364073#comment-16364073
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168180636
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/SnapshotStrategy.java
 ---
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+
+import java.util.concurrent.RunnableFuture;
+
+/**
+ * Interface for different snapshot approaches in state backends. 
Implementing classes should ideally be stateless or at
+ * least threadsafe, i.e. this is a functional interface and is can be 
called in parallel by multiple checkpoints.
+ *
+ * @param  type of the returned state object that represents the result 
of the snapshot operation.
+ */
+public interface SnapshotStrategy {
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5458: [FLINK-8212] [network] Pull EnvironmentInformation...

2018-02-14 Thread zhangminglei
Github user zhangminglei commented on a diff in the pull request:

https://github.com/apache/flink/pull/5458#discussion_r168180756
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
 ---
@@ -270,10 +273,14 @@ public static TaskManagerServices fromConfiguration(
 * Creates a {@link MemoryManager} from the given {@link 
TaskManagerServicesConfiguration}.
 *
 * @param taskManagerServicesConfiguration to create the memory manager 
from
+* @param freeHeapMemoryWithDefrag an estimate of the size of the free 
heap memory
+* @param maxJvmHeapMemory the maximum JVM heap size
 * @return Memory manager
 * @throws Exception
 */
-   private static MemoryManager 
createMemoryManager(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration) throws Exception {
+   private static MemoryManager 
createMemoryManager(TaskManagerServicesConfiguration 
taskManagerServicesConfiguration,
+   
 long freeHeapMemoryWithDefrag,
+   
 long maxJvmHeapMemory) throws Exception {
--- End diff --

Will fix ~


---


[jira] [Created] (FLINK-8655) Add a default keyspace to CassandraSink

2018-02-14 Thread Christopher Hughes (JIRA)
Christopher Hughes created FLINK-8655:
-

 Summary: Add a default keyspace to CassandraSink
 Key: FLINK-8655
 URL: https://issues.apache.org/jira/browse/FLINK-8655
 Project: Flink
  Issue Type: Improvement
Affects Versions: 1.4.0
Reporter: Christopher Hughes
 Fix For: 1.4.1


Currently, to use the CassandraPojoSink, it is necessary for a user to provide 
keyspace information on the desired POJOs using datastax annotations.  This 
allows various POJOs to be written to multiple keyspaces while sinking 
messages, but prevent runtime flexibility.

For many developers, non-production environments may all share a single 
Cassandra instance differentiated by keyspace names.  I propose adding a 
`defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs that lack a 
keyspace would be piped to the default. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5185: [FLINK-8297] [flink-rocksdb] Optionally store elements of...

2018-02-14 Thread je-ik
Github user je-ik commented on the issue:

https://github.com/apache/flink/pull/5185
  
@aljoscha I updated the title. I'm a little concerned about the 
serialization in savepoint. If the serialization is *exactly* the same, doesn't 
that actually mean that again, the whole List will be stored in single byte[], 
which will OOME for cases which the user wanted to solve by activating the 
"large list" implementation? Or am I missing something?


---


[jira] [Commented] (FLINK-8656) Add CLI command for rescaling

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8656?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364090#comment-16364090
 ] 

ASF GitHub Bot commented on FLINK-8656:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5487

[FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs

## What is the purpose of the change

Jobs can now be rescaled by calling flink modify  -p .
Internally, the CliFrontend will send the corresponding REST call and poll
for status updates.

This PR is based on #5454.

## Brief change log

- Add `modify` call to `CliFrontend`
- Add `ClusterClient#rescaleJob` method with default implementation
- Implement `RestClusterClient#rescalJob` method to trigger asynchronous 
rescale operation via REST and poll for its status updates

## Verifying this change

- Tested manually
- Added `CliFrontendModifyTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs + stdout help)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rescaleCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5487


commit d9159228091cae9ebbd1bb718b69e6cf452881e1
Author: Till Rohrmann 
Date:   2018-02-13T11:41:44Z

[FLINK-8643] [flip6] Use JobManagerOptions#SLOT_REQUEST_TIMEOUT in 
ExecutionGraph

This commit changes the initialization of the ExecutionGraph to use the
JobManagerOptions#SLOT_REQUEST_TIMEOUT for the slot allocation. Furthermore,
it changes the behaviour of the SlotPool#ProviderAndOwner implementation 
such
that the timeout is given to it via the SlotProvider#allocateSlot call.

commit 19780c9d284914ec51e92231536315299a3c2da3
Author: Till Rohrmann 
Date:   2018-02-13T12:18:01Z

[hotfix] [flip6] Remove unnecessary timeout from SlotPool

commit 9924776c92a378cef144c0767f1ff18b799d52e9
Author: Till Rohrmann 
Date:   2018-02-13T14:33:11Z

[FLINK-8647] [flip6] Introduce JobMasterConfiguration

This commit introduces a JobMasterConfiguration which contains JobMaster 
specific
configuration settings.

commit fde75841de2e27cb7380f3a28066a99e2c1a690d
Author: zentol 
Date:   2018-01-23T12:50:32Z

[FLINK-8475][config][docs] Integrate HA-ZK options

This closes #5462.

commit 788a17fdbd4aaf3429ead4491ede197fc775b1f0
Author: zentol 
Date:   2018-01-23T13:04:36Z

[FLINK-8475][config][docs] Integrate YARN options

This closes #5463.

commit fcd783358c282e61bf12e0c18298c237c85a6695
Author: Till Rohrmann 
Date:   2018-02-13T15:08:38Z

[hotfix] [tests] Simplify JobMasterTest

commit 8206e6f13809c0b60bfaf776bc386088f535e723
Author: Till Rohrmann 
Date:   2018-02-13T15:10:09Z

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.

commit 057a95b7328b1cca7b78bf1dd25e8d048df70410
Author: Till Rohrmann 
Date:   2018-02-13T15:11:37Z

[hotfix] Fix checkstyle violations in ExecutionGraph

commit 9930b0991320bcff268ca82db6378df8976560dc
Author: Till Rohrmann 
Date:   2018-02-13T15:12:41Z

[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph

The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended 
but its
clean up has not been done yet. Only after all Executions have been 
canceled, the
ExecutionGraph will enter the SUSPENDED state and complete the termination 
future
accordingly.

commit 6c51ad306c90464572353168ecafdb962794747e
Author: Till Rohrmann 
Date:   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168184825
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -363,1691 +372,1780 @@ public int getKeyGroupPrefixBytes() {
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointOptions.CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture 
snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " +
-   checkpointTimestamp + " . Returning 
null.");
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
-
-   snapshotOperation.takeSnapshot();
-
-   return new FutureTask(
-   () -> snapshotOperation.materializeSnapshot()
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
-   }
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
-   @Override
-   protected void done() {
-   
snapshotOperation.releaseResources(isCancelled());
+   try {
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else if (restoreState.iterator().next() instanceof 
IncrementalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation restoreOperation 
= new RocksDBFullRestoreOperation<>(this);
+   restoreOperation.doRestore(restoreState);
}
-   };
+   } catch (Exception ex) {
+   dispose();
+   throw ex;
+   }
}
 
-   private RunnableFuture snapshotFully(
-   final long checkpointId,
-   final long timestamp,
-   final CheckpointStreamFactory streamFactory) throws Exception {
-
-   long startTime = System.currentTimeMillis();
-   final CloseableRegistry snapshotCloseableRegistry = new 
CloseableRegistry();
-
-   final RocksDBFullSnapshotOperation snapshotOperation;
-
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at " + timestamp +
-   " . Returning null.");
-   }
+   @Override
+   public void notifyCheckpointComplete(long 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168184599
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapKeyedStateBackend.java
 ---
@@ -630,19 +506,210 @@ public int numStateEntries(Object namespace) {
return sum;
}
 
-   public  StateTable 
newStateTable(RegisteredKeyedBackendStateMetaInfo newMetaInfo) {
-   return asynchronousSnapshots ?
-   new CopyOnWriteStateTable<>(this, newMetaInfo) :
-   new NestedMapsStateTable<>(this, newMetaInfo);
-   }
-
@Override
public boolean supportsAsynchronousSnapshots() {
-   return asynchronousSnapshots;
+   return snapshotStrategy.isAsynchronous();
}
 
@VisibleForTesting
public FsStateBackend.LocalRecoveryConfig getLocalRecoveryConfig() {
return localRecoveryConfig;
}
+
+   /**
+* Base class for the snapshots of the heap backend that outlines the 
algorithm and offers some hooks to realize
+* the concrete strategies.
+*/
+   private abstract class HeapSnapshotStrategy implements 
SnapshotStrategy {
+
+   @Override
+   public RunnableFuture 
performSnapshot(
+   long checkpointId,
+   long timestamp,
+   CheckpointStreamFactory streamFactory,
+   CheckpointOptions checkpointOptions) throws Exception {
+
+   if (!hasRegisteredState()) {
+   return DoneFuture.nullValue();
+   }
+
+   long syncStartTime = System.currentTimeMillis();
+
+   Preconditions.checkState(stateTables.size() <= 
Short.MAX_VALUE,
+   "Too many KV-States: " + stateTables.size() +
+   ". Currently at most " + 
Short.MAX_VALUE + " states are supported");
+
+   List metaInfoSnapshots =
+   new ArrayList<>(stateTables.size());
+
+   final Map kVStateToId = new 
HashMap<>(stateTables.size());
+
+   final Map, StateTableSnapshot> 
cowStateStableSnapshots =
+   new HashedMap(stateTables.size());
+
+   for (Map.Entry> kvState : 
stateTables.entrySet()) {
+   kVStateToId.put(kvState.getKey(), 
kVStateToId.size());
+   StateTable stateTable = 
kvState.getValue();
+   if (null != stateTable) {
+   
metaInfoSnapshots.add(stateTable.getMetaInfo().snapshot());
+   cowStateStableSnapshots.put(stateTable, 
stateTable.createSnapshot());
+   }
+   }
+
+   final KeyedBackendSerializationProxy 
serializationProxy =
+   new KeyedBackendSerializationProxy<>(
+   keySerializer,
+   metaInfoSnapshots,
+   
!Objects.equals(UncompressedStreamCompressionDecorator.INSTANCE, 
keyGroupCompressionDecorator));
+
+   //--- 
this becomes the end of sync part
+
+   // implementation of the async IO operation, based on 
FutureTask
+   final 
AbstractAsyncCallableWithResources ioCallable 
=
+   new 
AbstractAsyncCallableWithResources() {
+
+   
CheckpointStreamFactory.CheckpointStateOutputStream stream = null;
+
+   @Override
+   protected void acquireResources() 
throws Exception {
+   stream = 
streamFactory.createCheckpointStateOutputStream(checkpointId, timestamp);
+   
cancelStreamRegistry.registerCloseable(stream);
+   }
+
+   @Override
+   protected void releaseResources() 
throws Exception {
+
+   if 
(cancelStreamRegistry.unregisterCloseable(stream)) {
+   
IOUtils.closeQuietly(stream);
+   

[GitHub] flink pull request #5436: [FLINK-8613] [flip6] [yarn] Return excess containe...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5436#discussion_r168184923
  
--- Diff: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java ---
@@ -325,26 +325,43 @@ public void 
onContainersCompleted(List list) {
@Override
public void onContainersAllocated(List containers) {
for (Container container : containers) {
-   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
-   log.info("Received new container: {} - Remaining 
pending container requests: {}",
-   container.getId(), 
numPendingContainerRequests);
-   final String containerIdStr = 
container.getId().toString();
-   workerNodeMap.put(new ResourceID(containerIdStr),
-   new YarnWorkerNode(container));
-   try {
-   /** Context information used to start a 
TaskExecutor Java process */
-   ContainerLaunchContext 
taskExecutorLaunchContext =
-   createTaskExecutorLaunchContext(
-   
container.getResource(), containerIdStr, container.getNodeId().getHost());
-   nodeManagerClient.startContainer(container, 
taskExecutorLaunchContext);
-   }
-   catch (Throwable t) {
-   // failed to launch the container, will release 
the failed one and ask for a new one
-   log.error("Could not start TaskManager in 
container {},", container, t);
+   log.info(
+   "Received new container: {} - Remaining pending 
container requests: {}",
+   container.getId(),
+   numPendingContainerRequests);
+
+   if (numPendingContainerRequests > 0) {
+   numPendingContainerRequests = Math.max(0, 
numPendingContainerRequests - 1);
--- End diff --

Isn't it enough to write `numPendingContainerRequests--`?
If `numPendingContainerRequests` is `0`, it won't go into this branch in 
the next iteration.


---


[jira] [Commented] (FLINK-8649) Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8649?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363644#comment-16363644
 ] 

ASF GitHub Bot commented on FLINK-8649:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5479
  
Thank you @ggevay. I will look at this soon.


> Scala StreamExecutionEnvironment.createInput should pass on the TypeInfo
> 
>
> Key: FLINK-8649
> URL: https://issues.apache.org/jira/browse/FLINK-8649
> Project: Flink
>  Issue Type: Bug
>  Components: Scala API
>Affects Versions: 1.4.0
>Reporter: Gabor Gevay
>Assignee: Gabor Gevay
>Priority: Trivial
> Fix For: 1.5.0
>
>
> This is {{StreamExecutionEnvironment.createInput}} in the Scala API:
> {code}
> def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
> DataStream[T] =
>   asScalaStream(javaEnv.createInput(inputFormat))
> {code}
> It should pass on the implicitly got {{TypeInformation}} to Java like this:
> {code}
> def createInput[T: TypeInformation](inputFormat: InputFormat[T, _]): 
> DataStream[T] =
>   asScalaStream(javaEnv.createInput(inputFormat, 
> implicitly[TypeInformation[T]]))
> {code}
> The current situation creates a problem, for example, when we have generics 
> in the type like in the following code, where the Java API can't deduce the 
> {{TypeInformation}} on its own:
> {code}
>  
> StreamExecutionEnvironment.getExecutionEnvironment.createInput[Tuple2[Integer,
>  Integer]](new ParallelIteratorInputFormat[Tuple2[Integer, Integer]](null))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5479: [FLINK-8649] [scala api] Pass on TypeInfo in StreamExecut...

2018-02-14 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5479
  
Thank you @ggevay. I will look at this soon.


---


[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-14 Thread tarun razdan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tarun razdan updated FLINK-7756:

Attachment: (was: taskmanager.log)

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, taskmanager.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> 

[jira] [Updated] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-14 Thread tarun razdan (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

tarun razdan updated FLINK-7756:

Attachment: (was: jobmanager.log)

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, taskmanager.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> 

[jira] [Commented] (FLINK-8479) Implement time-bounded inner join of streams as a TwoInputStreamOperator

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363732#comment-16363732
 ] 

ASF GitHub Bot commented on FLINK-8479:
---

GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/5482

[Flink-8480][DataStream] Add Java API for timebounded stream join

## What is the purpose of the change

* Add a JavaAPI to the DataStream API to join two streams based on 
user-defined time boundaries
* Design doc can be found here 
https://docs.google.com/document/d/16GMH5VM6JJiWj_N0W8y3PtQ1aoJFxsKoOTSYOfqlsRE/edit#heading=h.bgl260hr56g6

## Brief change log
* Add option`.between(Time, Time)` to streams that are already joined and 
have their key selectors `where` and `equalTo` defined
* Add new inner class `TimeBounded` to `JoinedStreams`, which exposes 
`process(TimeBoundedJoinFunction)` as well as optional 
`upperBoundExclusive(boolean)` and `lowerBoundExclusive(boolean)` to the user
* Add new integration test `TimeboundedJoinITCase`
* **Depends on [FLINK-8479] to be merged**

Full example usage:

```java
streamOne
.join(streamTwo)
.where(new MyKeySelector())
.equalTo(new MyKeySelector())
.between(Time.milliseconds(-1), Time.milliseconds(1))
.process(new UdfTimeBoundedJoinFunction())
.addSink(new ResultSink());
```

## Verifying this change
This change added tests and can be verified as follows: 
- Added integration tests in `TimeboundedJoinITCase` that validate 
parameter translation and execution

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: yes
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? JavaDocs


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
flink-8480-timebounded-join-java-api

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5482.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5482


commit 34451540116d8bdd284fd01016a4cc74d8564d37
Author: Florian Schmidt 
Date:   2018-01-18T14:47:14Z

[FLINK-8479] Implement TimeBoundedStreamJoinOperator

This operator is the basis for performing an inner join on two
streams using a time criteria defined as a lower and upper bound

commit fe65b1ead0511b0df5d640c728f5ce9e273d7ed5
Author: Florian Schmidt 
Date:   2018-02-13T14:48:40Z

[FLINK-8480][DataStream] Add java api for timebounded stream joins

This commit adds a java implementation for timebounded stream joins.
The usage looks roughly like the following:

```java
streamOne
.join(streamTwo)
.where(new Tuple2KeyExtractor())
.equalTo(new Tuple2KeyExtractor())
.between(Time.milliseconds(0), Time.milliseconds(1))
.process(new CombineToStringJoinFunction())
.addSink(new ResultSink());
```

This change adds the functionality in JoinedStreams.java and adds
integration tests in TimeboundedJoinITCase.java




> Implement time-bounded inner join of streams as a TwoInputStreamOperator
> 
>
> Key: FLINK-8479
> URL: https://issues.apache.org/jira/browse/FLINK-8479
> Project: Flink
>  Issue Type: Sub-task
>Reporter: Florian Schmidt
>Assignee: Florian Schmidt
>Priority: Major
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8576) Log message for QueryableState loading failure too verbose

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363766#comment-16363766
 ] 

ASF GitHub Bot commented on FLINK-8576:
---

Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5420#discussion_r168137274
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 ---
@@ -132,9 +134,7 @@ public static KvStateServer createKvStateServer(
KvStateRequestStats.class);
return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
} catch (ClassNotFoundException e) {
-   final String msg = "Could not load Queryable State 
Server. " +
-   "Probable reason: flink-queryable-state-runtime 
is not in the classpath. " +
-   "Please put the corresponding jar from the opt 
to the lib folder.";
+   final String msg = "Could not load Queryable State 
Server. " + ERROR_MESSAGE_ON_LOAD_FAILURE;
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
--- End diff --

The `debug` message should change as well as above, right?


> Log message for QueryableState loading failure too verbose
> --
>
> Key: FLINK-8576
> URL: https://issues.apache.org/jira/browse/FLINK-8576
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.5.0
>
>
> Whenever a job- or taskmanager is started it attempts to load the queryable 
> state via reflection. If this fails due to the classes not being in the 
> classpath (which is common and the default path) we log the full stacktrace 
> as DEBUG.
> We should reduce this to a single line as it get's really verbose when 
> sifting through debug logs.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8533) Support MasterTriggerRestoreHook state reinitialization

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8533?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363764#comment-16363764
 ] 

ASF GitHub Bot commented on FLINK-8533:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/5427
  
@EronWright, you're right that on initial submission we don't call 
`restoreLatestCheckpointedState` in the old code. With Flip-6 this will be the 
case. See #5444.

The underlying assumption to make this work, though, is that a user won't 
submit a new job with the a job id to a cluster with a cluster id for which 
ZooKeeper already contains persisted checkpoints from a previous run. So either 
the cluster id or the job id must be different. 

I think so far, when using the Flink client this should be the case. 
However, when generating the `JobGraph` yourself and keeping it around to 
submit it to a standalone cluster, then this assumption will break because both 
the `JobID` and the cluster id will be the same.


> Support MasterTriggerRestoreHook state reinitialization
> ---
>
> Key: FLINK-8533
> URL: https://issues.apache.org/jira/browse/FLINK-8533
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Affects Versions: 1.3.0
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>Priority: Major
>
> {{MasterTriggerRestoreHook}} enables coordination with an external system for 
> taking or restoring checkpoints. When execution is restarted from a 
> checkpoint, {{restoreCheckpoint}} is called to restore or reinitialize the 
> external system state. There's an edge case where the external state is not 
> adequately reinitialized, that is when execution fails _before the first 
> checkpoint_. In that case, the hook is not invoked and has no opportunity to 
> restore the external state to initial conditions.
> The impact is a loss of exactly-once semantics in this case. For example, in 
> the Pravega source function, the reader group state (e.g. stream position 
> data) is stored externally. In the normal restore case, the reader group 
> state is forcibly rewound to the checkpointed position. In the edge case 
> where no checkpoint has yet been successful, the reader group state is not 
> rewound and consequently some amount of stream data is not reprocessed.
> A possible fix would be to introduce an {{initializeState}} method on the 
> hook interface. Similar to {{CheckpointedFunction::initializeState}}, this 
> method would be invoked unconditionally upon hook initialization. The Pravega 
> hook would, for example, initialize or forcibly reinitialize the reader group 
> state.    



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5420: [FLINK-8576][QS] Reduce verbosity when classes can...

2018-02-14 Thread kl0u
Github user kl0u commented on a diff in the pull request:

https://github.com/apache/flink/pull/5420#discussion_r168137274
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/query/QueryableStateUtils.java
 ---
@@ -132,9 +134,7 @@ public static KvStateServer createKvStateServer(
KvStateRequestStats.class);
return constructor.newInstance(address, ports, 
eventLoopThreads, queryThreads, kvStateRegistry, stats);
} catch (ClassNotFoundException e) {
-   final String msg = "Could not load Queryable State 
Server. " +
-   "Probable reason: flink-queryable-state-runtime 
is not in the classpath. " +
-   "Please put the corresponding jar from the opt 
to the lib folder.";
+   final String msg = "Could not load Queryable State 
Server. " + ERROR_MESSAGE_ON_LOAD_FAILURE;
if (LOG.isDebugEnabled()) {
LOG.debug(msg, e);
--- End diff --

The `debug` message should change as well as above, right?


---


[jira] [Commented] (FLINK-8475) Move remaining sections to generated tables

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8475?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363814#comment-16363814
 ] 

ASF GitHub Bot commented on FLINK-8475:
---

Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5469
  
Ok, sounds fine to me then.  


> Move remaining sections to generated tables
> ---
>
> Key: FLINK-8475
> URL: https://issues.apache.org/jira/browse/FLINK-8475
> Project: Flink
>  Issue Type: Sub-task
>  Components: Configuration
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5469: [FLINK-8475][config][docs] Integrate Core options

2018-02-14 Thread aljoscha
Github user aljoscha commented on the issue:

https://github.com/apache/flink/pull/5469
  
Ok, sounds fine to me then. 👍 


---


[jira] [Commented] (FLINK-7756) RocksDB state backend Checkpointing (Async and Incremental) is not working with CEP.

2018-02-14 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7756?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363826#comment-16363826
 ] 

Aljoscha Krettek commented on FLINK-7756:
-

Also, did you ever try running this without any CEP operator but with other 
state in RocksDB?

> RocksDB state backend Checkpointing (Async and Incremental)  is not working 
> with CEP.
> -
>
> Key: FLINK-7756
> URL: https://issues.apache.org/jira/browse/FLINK-7756
> Project: Flink
>  Issue Type: Sub-task
>  Components: CEP, State Backends, Checkpointing, Streaming
>Affects Versions: 1.4.0, 1.3.2
> Environment: Flink 1.3.2, Yarn, HDFS, RocksDB backend
>Reporter: Shashank Agarwal
>Assignee: Aljoscha Krettek
>Priority: Blocker
> Fix For: 1.5.0, 1.4.1
>
> Attachments: jobmanager.log, jobmanager_without_cassandra.log, 
> taskmanager.log, taskmanager_without_cassandra.log
>
>
> When i try to use RocksDBStateBackend on my staging cluster (which is using 
> HDFS as file system) it crashes. But When i use FsStateBackend on staging 
> (which is using HDFS as file system) it is working fine.
> On local with local file system it's working fine in both cases.
> Please check attached logs. I have around 20-25 tasks in my app.
> {code:java}
> 2017-09-29 14:21:31,639 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=0).
> 2017-09-29 14:21:31,640 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,020 INFO  
> org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink  - No state 
> to restore for the BucketingSink (taskIdx=1).
> 2017-09-29 14:21:32,022 INFO  
> org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend  - 
> Initializing RocksDB keyed state backend from snapshot.
> 2017-09-29 14:21:32,078 INFO  com.datastax.driver.core.NettyUtil  
>   - Found Netty's native epoll transport in the classpath, using 
> it
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (1/2) 
> (b879f192c4e8aae6671cdafb3a24c00a).
> 2017-09-29 14:21:34,177 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Map (2/2) 
> (1ea5aef6ccc7031edc6b37da2912d90b).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Attempting to fail task externally Co-Flat Map (2/2) 
> (4bac8e764c67520d418a4c755be23d4d).
> 2017-09-29 14:21:34,178 INFO  org.apache.flink.runtime.taskmanager.Task   
>   - Co-Flat Map (1/2) (b879f192c4e8aae6671cdafb3a24c00a) switched 
> from RUNNING to FAILED.
> AsynchronousException{java.lang.Exception: Could not materialize checkpoint 2 
> for operator Co-Flat Map (1/2).}
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:970)
>   at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.Exception: Could not materialize checkpoint 2 for 
> operator Co-Flat Map (1/2).
>   ... 6 more
> Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at java.util.concurrent.FutureTask.get(FutureTask.java:192)
>   at 
> org.apache.flink.util.FutureUtil.runIfNotDoneAndGet(FutureUtil.java:43)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:897)
>   ... 5 more
>   Suppressed: java.lang.Exception: Could not properly cancel managed 
> keyed state future.
>   at 
> org.apache.flink.streaming.api.operators.OperatorSnapshotResult.cancel(OperatorSnapshotResult.java:90)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.cleanup(StreamTask.java:1023)
>   at 
> org.apache.flink.streaming.runtime.tasks.StreamTask$AsyncCheckpointRunnable.run(StreamTask.java:961)
>   ... 5 more
>   Caused by: java.util.concurrent.ExecutionException: 
> java.lang.IllegalStateException
>   at java.util.concurrent.FutureTask.report(FutureTask.java:122)
>   at 

[GitHub] flink pull request #5484: [FLINK-8593][metrics] Update latency metric docs

2018-02-14 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5484

[FLINK-8593][metrics] Update latency metric docs

This PR updates the documentation of the latency metric to be in line with 
the recent changes in 
[FLINK-7608](https://issues.apache.org/jira/browse/FLINK-7608).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8593

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5484


commit 93e95312e63db9d0955fd4c75b9e9df3b004eb78
Author: zentol 
Date:   2018-02-14T11:45:17Z

[FLINK-8593][metrics] Update latency metric docs




---


[jira] [Commented] (FLINK-8593) Latency metric docs are outdated

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8593?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363846#comment-16363846
 ] 

ASF GitHub Bot commented on FLINK-8593:
---

GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/5484

[FLINK-8593][metrics] Update latency metric docs

This PR updates the documentation of the latency metric to be in line with 
the recent changes in 
[FLINK-7608](https://issues.apache.org/jira/browse/FLINK-7608).

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8593

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5484.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5484


commit 93e95312e63db9d0955fd4c75b9e9df3b004eb78
Author: zentol 
Date:   2018-02-14T11:45:17Z

[FLINK-8593][metrics] Update latency metric docs




> Latency metric docs are outdated
> 
>
> Key: FLINK-8593
> URL: https://issues.apache.org/jira/browse/FLINK-8593
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> I missed to update the latency metric documentation while working on 
> FLINK-7608. The docs should be updated to contain the new naming scheme and 
> that it is a job-level metric.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8605) Enable job cancellation from the web UI

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363922#comment-16363922
 ] 

ASF GitHub Bot commented on FLINK-8605:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168148700
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 ---
@@ -169,22 +192,26 @@ public static Builder newBuilder() {
private String address = LOCALHOST;
private String hostname = LOCALHOST;
private String restAddress = LOCALHOST;
+   private Function 
cancelJobFunction;
+   private Function 
stopJobFunction;
private Function requestJobFunction;
private Function 
requestJobStatusFunction;
private Supplier 
requestMultipleJobDetailsSupplier;
private Supplier 
requestClusterOverviewSupplier;
private Supplier> 
requestMetricQueryServicePathsSupplier;
private 
Supplier 
requestOeratorBackPressureStatsFunction;
+   private BiFunction 
requestOperatorBackPressureStatsFunction;
--- End diff --

nice


> Enable job cancellation from the web UI
> ---
>
> Key: FLINK-8605
> URL: https://issues.apache.org/jira/browse/FLINK-8605
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to enable the job cancellation from the web UI (including YARN) we 
> have to register the {{JobTerminationHandler}} under 
> {{/jobs/:jobId/yarn-cancel}} and {{/jobs/:jobid/yarn-stop}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5430: [FLINK-8605] [rest] Enable job cancellation from t...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168148700
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/webmonitor/TestingRestfulGateway.java
 ---
@@ -169,22 +192,26 @@ public static Builder newBuilder() {
private String address = LOCALHOST;
private String hostname = LOCALHOST;
private String restAddress = LOCALHOST;
+   private Function 
cancelJobFunction;
+   private Function 
stopJobFunction;
private Function requestJobFunction;
private Function 
requestJobStatusFunction;
private Supplier 
requestMultipleJobDetailsSupplier;
private Supplier 
requestClusterOverviewSupplier;
private Supplier> 
requestMetricQueryServicePathsSupplier;
private 
Supplier 
requestOeratorBackPressureStatsFunction;
+   private BiFunction 
requestOperatorBackPressureStatsFunction;
--- End diff --

nice


---


[GitHub] flink pull request #5430: [FLINK-8605] [rest] Enable job cancellation from t...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168149188
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationHeaders.java
 ---
@@ -32,7 +32,7 @@
 
private static final JobTerminationHeaders INSTANCE = new 
JobTerminationHeaders();
 
-   private JobTerminationHeaders() {}
+   protected JobTerminationHeaders() {}
--- End diff --

I don't think this is needed.


---


[jira] [Commented] (FLINK-8605) Enable job cancellation from the web UI

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363921#comment-16363921
 ] 

ASF GitHub Bot commented on FLINK-8605:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168149188
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/messages/JobTerminationHeaders.java
 ---
@@ -32,7 +32,7 @@
 
private static final JobTerminationHeaders INSTANCE = new 
JobTerminationHeaders();
 
-   private JobTerminationHeaders() {}
+   protected JobTerminationHeaders() {}
--- End diff --

I don't think this is needed.


> Enable job cancellation from the web UI
> ---
>
> Key: FLINK-8605
> URL: https://issues.apache.org/jira/browse/FLINK-8605
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to enable the job cancellation from the web UI (including YARN) we 
> have to register the {{JobTerminationHandler}} under 
> {{/jobs/:jobId/yarn-cancel}} and {{/jobs/:jobid/yarn-stop}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8605) Enable job cancellation from the web UI

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363920#comment-16363920
 ] 

ASF GitHub Bot commented on FLINK-8605:
---

Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168165205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -593,6 +617,8 @@ public void shutdown(Time timeout) {
} catch (Exception e) {
log.warn("Error while stopping leaderElectionService", 
e);
}
+
+   super.shutdown(timeout);
--- End diff --

Why is it better to move it to the end?


> Enable job cancellation from the web UI
> ---
>
> Key: FLINK-8605
> URL: https://issues.apache.org/jira/browse/FLINK-8605
> Project: Flink
>  Issue Type: Improvement
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to enable the job cancellation from the web UI (including YARN) we 
> have to register the {{JobTerminationHandler}} under 
> {{/jobs/:jobId/yarn-cancel}} and {{/jobs/:jobid/yarn-stop}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5430: [FLINK-8605] [rest] Enable job cancellation from t...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5430#discussion_r168165205
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorEndpoint.java
 ---
@@ -593,6 +617,8 @@ public void shutdown(Time timeout) {
} catch (Exception e) {
log.warn("Error while stopping leaderElectionService", 
e);
}
+
+   super.shutdown(timeout);
--- End diff --

Why is it better to move it to the end?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363953#comment-16363953
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168418
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---
@@ -316,6 +316,11 @@
 */
public static final String TASK_MANAGER_REFUSED_REGISTRATION_PAUSE = 
"taskmanager.refused-registration-pause";
 
+   /**
+* The config parameter defining the root directories for storing 
file-based state for local recovery.
+*/
+   public static final String TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY = 
"taskmanager.local-state-root.dir";
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168634
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
+   private TernaryBoolean enableIncrementalCheckpointing;
--- End diff --

I think by convention, the `is` should not be part of a boolean field name, 
only replace the `get` in the getter.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168770
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/util/MethodForwardingTestUtil.java ---
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.util;
+
+import org.mockito.Mockito;
+
+import java.lang.reflect.Array;
+import java.lang.reflect.Method;
+import java.util.Collections;
+import java.util.Set;
+import java.util.function.Function;
+
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.spy;
+
+/**
+ * Helper class with a method that attempt to automatically test method 
forwarding between a delegate and a wrapper.
+ */
+public class MethodForwardingTestUtil {
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. This ignores methods that are 
inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory) {
+   testMethodForwarding(delegateClass, wrapperFactory, 
Collections.emptySet());
+   }
+
+   /**
+* This is a best effort automatic test for method forwarding between a 
delegate and its wrapper, where the wrapper
+* class is a subtype of the delegate. Methods can be remapped in case 
that the implementation does not call the
+* original method. Remapping to null skips the method. This ignores 
methods that are inherited from Object.
+*
+* @param delegateClass the class for the delegate.
+* @param wrapperFactory factory that produces a wrapper from a 
delegate.
+* @param skipMethodSet set of methods to ignore.
+* @param  type of the delegate
+* @param  type of the wrapper
+*/
+   public static  void testMethodForwarding(
+   Class delegateClass,
+   Function wrapperFactory,
+   Set skipMethodSet) {
+
+   Preconditions.checkNotNull(delegateClass);
+   Preconditions.checkNotNull(wrapperFactory);
+   Preconditions.checkNotNull(skipMethodSet);
+
+   D delegate = spy(delegateClass);
+   W wrapper = wrapperFactory.apply(delegate);
+
+   // ensure that wrapper is a subtype of delegate
+   
Preconditions.checkArgument(delegateClass.isAssignableFrom(wrapper.getClass()));
+
+   for (Method delegateMethod : delegateClass.getMethods()) {
+
+   if (checkSkipMethodForwardCheck(delegateMethod, 
skipMethodSet)) {
+   continue;
+   }
+
+   try {
+   // find the correct method to substitute the 
bridge for erased generic types.
+   // if this doesn't work, the user need to 
exclude the method and write an additional test.
+   Method wrapperMethod = 
wrapper.getClass().getDeclaredMethod(
+   delegateMethod.getName(),
+   delegateMethod.getParameterTypes());
+
+   // things get a bit fuzzy here, best effort to 
find a match but this might end up with a wrong method.
+   if (wrapperMethod.isBridge()) {
+   for (Method method : 
wrapper.getClass().getDeclaredMethods()) {
+   if (!method.isBridge()
+   

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168705
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
+
+   /**
+* This tests the forwarding of jm and tm-local state from the futures 
reported by the backends, through the
+* async checkpointing thread to the {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+*/
+   @Test
+   public void testForwardingFromSnapshotToTaskStateManager() throws 
Exception {
+
+   TestTaskStateManager taskStateManager = new 
TestTaskStateManager();
+
+   StreamMockEnvironment streamMockEnvironment = new 
StreamMockEnvironment(
+   new Configuration(),
+   new Configuration(),
+   new ExecutionConfig(),
+   1024*1024,
+   new MockInputSplitProvider(),
+   0,
+   taskStateManager);
+
+   StreamTask testStreamTask = new 
StreamTaskTest.NoOpStreamTask(streamMockEnvironment);
+   CheckpointMetaData checkpointMetaData = new 
CheckpointMetaData(0L, 0L);
+   CheckpointMetrics checkpointMetrics = new CheckpointMetrics();
+
+   Map snapshots = new 
HashMap<>(1);
+   OperatorSnapshotFutures osFuture = new 
OperatorSnapshotFutures();
+
+   
osFuture.setKeyedStateManagedFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setKeyedStateRawFuture(createSnapshotResult(KeyedStateHandle.class));
+   
osFuture.setOperatorStateManagedFuture(createSnapshotResult(OperatorStateHandle.class));
+   
osFuture.setOperatorStateRawFuture(createSnapshotResult(OperatorStateHandle.class));
+
+   OperatorID operatorID = 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168472
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
--- End diff --

This is changed in later commits.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168810
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StateObjectCollectionTest.java
 ---
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.checkpoint;
+
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.util.MethodForwardingTestUtil;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.function.Function;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Tests for {@link StateObjectCollection}.
+ */
+public class StateObjectCollectionTest {
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363952#comment-16363952
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168381
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskLocalStateStore.java
 ---
@@ -46,26 +52,63 @@
/** */
private final int subtaskIndex;
 
+   /** */
+   private final Map 
storedTaskStateByCheckpointID;
+
+   /** This is the base directory for all local state of the subtask that 
owns this {@link TaskLocalStateStore}. */
+   private final File subtaskLocalStateBaseDirectory;
+
public TaskLocalStateStore(
JobID jobID,
JobVertexID jobVertexID,
-   int subtaskIndex) {
+   int subtaskIndex,
+   File localStateRootDir) {
 
this.jobID = jobID;
this.jobVertexID = jobVertexID;
this.subtaskIndex = subtaskIndex;
+   this.storedTaskStateByCheckpointID = new HashMap<>();
+   this.subtaskLocalStateBaseDirectory =
+   new File(localStateRootDir, createSubtaskPath(jobID, 
jobVertexID, subtaskIndex));
+   }
+
+   static String createSubtaskPath(JobID jobID, JobVertexID jobVertexID, 
int subtaskIndex) {
+   return "jid-" + jobID + "_vtx-" + jobVertexID + "_sti-" + 
subtaskIndex;
}
 
public void storeLocalState(
@Nonnull CheckpointMetaData checkpointMetaData,
@Nullable TaskStateSnapshot localState) {
 
-   if (localState != null) {
-   throw new UnsupportedOperationException("Implement this 
before actually providing local state!");
+   TaskStateSnapshot previous =
+   
storedTaskStateByCheckpointID.put(checkpointMetaData.getCheckpointId(), 
localState);
+
+   if (previous != null) {
+   throw new IllegalStateException("Found previously 
registered local state for checkpoint with id " +
+   checkpointMetaData.getCheckpointId() + "! This 
indicated a problem.");
}
}
 
-   public void dispose() {
-   //TODO
+   public void dispose() throws Exception {
+
+   Exception collectedException = null;
+
+   for (TaskStateSnapshot snapshot : 
storedTaskStateByCheckpointID.values()) {
+   try {
+   snapshot.discardState();
+   } catch (Exception discardEx) {
+   collectedException = 
ExceptionUtils.firstOrSuppressed(discardEx, collectedException);
+   }
+   }
+
+   if (collectedException != null) {
+   throw collectedException;
+   }
+
+   
FileUtils.deleteDirectoryQuietly(subtaskLocalStateBaseDirectory);
--- End diff --

This already works different in later commits.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168726
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -104,10 +107,8 @@
@Nullable
private OptionsFactory optionsFactory;
 
-   /** True if incremental checkpointing is enabled.
-* Null if not yet set, in which case the configuration values will be 
used. */
-   @Nullable
-   private Boolean enableIncrementalCheckpointing;
+   /** True if incremental checkpointing is enabled. */
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168796
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/LocalStateForwardingTest.java
 ---
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.tasks;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
+import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.checkpoint.StateObjectCollection;
+import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.runtime.state.DoneFuture;
+import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.SnapshotResult;
+import org.apache.flink.runtime.state.StateObject;
+import org.apache.flink.runtime.state.TaskLocalStateStore;
+import org.apache.flink.runtime.state.TaskStateManagerImpl;
+import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
+import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Future;
+import java.util.concurrent.RunnableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import static org.mockito.Mockito.mock;
+
+/**
+ * Test for forwarding of state reporting to and from {@link 
org.apache.flink.runtime.state.TaskStateManager}.
+ */
+public class LocalStateForwardingTest {
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363954#comment-16363954
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168168472
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
 ---
@@ -686,4 +714,53 @@ static void resetRocksDBLoadedFlag() throws Exception {
initField.setAccessible(true);
initField.setBoolean(null, false);
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
--- End diff --

This is changed in later commits.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5432: [FLINK-8609] [flip6] Enable Flip-6 job mode in Cli...

2018-02-14 Thread GJL
Github user GJL commented on a diff in the pull request:

https://github.com/apache/flink/pull/5432#discussion_r168173010
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -210,51 +225,72 @@ protected void run(String[] args) throws Exception {
 
final ClusterClient client;
 
-   if (clusterId != null) {
-   client = clusterDescriptor.retrieve(clusterId);
-   } else {
-   final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
-   client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
-   }
+   // directly deploy the job if the cluster is started in 
job mode and detached
+   if (flip6 && clusterId == null && 
runOptions.getDetachedMode()) {
+   int parallelism = runOptions.getParallelism() 
== -1 ? defaultParallelism : runOptions.getParallelism();
 
-   try {
-   
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
-   
client.setDetached(runOptions.getDetachedMode());
-   LOG.debug("Client slots is set to {}", 
client.getMaxSlots());
-
-   
LOG.debug(runOptions.getSavepointRestoreSettings().toString());
-
-   int userParallelism = 
runOptions.getParallelism();
-   LOG.debug("User parallelism is set to {}", 
userParallelism);
-   if (client.getMaxSlots() != -1 && 
userParallelism == -1) {
-   logAndSysout("Using the parallelism 
provided by the remote cluster ("
-   + client.getMaxSlots() + "). "
-   + "To use another parallelism, 
set it at the ./bin/flink client.");
-   userParallelism = client.getMaxSlots();
-   } else if (ExecutionConfig.PARALLELISM_DEFAULT 
== userParallelism) {
-   userParallelism = defaultParallelism;
-   }
+   final JobGraph jobGraph = 
createJobGraph(configuration, program, parallelism);
 
-   executeProgram(program, client, 
userParallelism);
-   } finally {
-   if (clusterId == null && !client.isDetached()) {
-   // terminate the cluster only if we 
have started it before and if it's not detached
-   try {
-   
clusterDescriptor.terminateCluster(client.getClusterId());
-   } catch (FlinkException e) {
-   LOG.info("Could not properly 
terminate the Flink cluster.", e);
-   }
-   }
+   final ClusterSpecification clusterSpecification 
= customCommandLine.getClusterSpecification(commandLine);
+   client = clusterDescriptor.deployJobCluster(
+   clusterSpecification,
+   jobGraph,
+   runOptions.getDetachedMode());
+
+   logAndSysout("Job has been submitted with JobID 
" + jobGraph.getJobID());
 
try {
client.shutdown();
} catch (Exception e) {
LOG.info("Could not properly shut down 
the client.", e);
}
+   } else {
+   if (clusterId != null) {
+   client = 
clusterDescriptor.retrieve(clusterId);
+   } else {
+   // also in job mode we have to deploy a 
session cluster because the job
+   // might consist of multiple parts 
(e.g. when using collect)
+   final ClusterSpecification 
clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
+   client = 
clusterDescriptor.deploySessionCluster(clusterSpecification);
+   }
+
+   try {
+   
client.setPrintStatusDuringExecution(runOptions.getStdoutLogging());
+

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364041#comment-16364041
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168176031
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 ---
@@ -60,4 +62,9 @@ void reportTaskStateSnapshots(
 * @return previous state for the operator. Null if no previous state 
exists.
 */
OperatorSubtaskState operatorStates(OperatorID operatorID);
+
+   /**
+* Returns the base directory for all file-based local state of the 
owning subtask.
+*/
+   File getSubtaskLocalStateBaseDirectory();
--- End diff --

This is the manager, not the state objects. So for local recovery that is 
not based on local files, the backend will just not care about directories and 
no invoke this method.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168176306
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
 ---
@@ -501,4 +529,53 @@ public String toString() {
"', asynchronous: " + asynchronousSnapshots +
", fileStateThreshold: " + fileStateThreshold + 
")";
}
+
+   /**
+* This enum represents the different modes for local recovery.
+*/
+   public enum LocalRecoveryMode {
+   DISABLED, ENABLE_FILE_BASED, ENABLE_HEAP_BASED
+   }
+
+   /**
+* This class encapsulates the configuration for local recovery of this 
backend.
+*/
+   public static final class LocalRecoveryConfig implements Serializable {
+
+   private static final long serialVersionUID = 1L;
+   private static final LocalRecoveryConfig DISABLED_SINGLETON =
+   new LocalRecoveryConfig(LocalRecoveryMode.DISABLED, 
null);
+
+   private final LocalRecoveryMode localRecoveryMode;
+   private final File localStateDirectory;
+
+   LocalRecoveryConfig(LocalRecoveryMode localRecoveryMode, File 
localStateDirectory) {
+   this.localRecoveryMode = 
Preconditions.checkNotNull(localRecoveryMode);
+   this.localStateDirectory = localStateDirectory;
+   if 
(LocalRecoveryMode.ENABLE_FILE_BASED.equals(localRecoveryMode) && 
localStateDirectory == null) {
+   throw new IllegalStateException("Local state 
directory must be specified if local recovery mode is " +
+   LocalRecoveryMode.ENABLE_FILE_BASED);
+   }
+   }
+
+   public LocalRecoveryMode getLocalRecoveryMode() {
+   return localRecoveryMode;
+   }
+
+   public File getLocalStateDirectory() {
+   return localStateDirectory;
--- End diff --

All this is changed a bit later, bit in essence the backend simply does not 
ask for the directory if nothing file-based is used.


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168176395
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraRowOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+import org.apache.flink.types.Row;
+
+/**
+ * OutputFormat to write Flink {@link Row}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Row}.
+ */
+public class CassandraRowOutputFormat extends 
CassandraOutputFormat {
--- End diff --

I don't think we have to handle subclasses of `Row`. Change `` to ``?


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364056#comment-16364056
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168177813
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorLocalStateStoresManagerTest.java
 ---
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.taskexecutor.TaskManagerServices;
+import 
org.apache.flink.runtime.taskexecutor.TaskManagerServicesConfiguration;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.net.InetAddress;
+
+public class TaskExecutorLocalStateStoresManagerTest {
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly creates the local state root directory
+* for the {@link TaskExecutorLocalStateStoresManager} with the 
configured root directory.
+*/
+   @Test
+   public void testCreationFromConfig() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final String rootDirString = "localStateRoot";
+   
config.setString(ConfigConstants.TASK_MANAGER_LOCAL_STATE_ROOT_DIR_KEY, 
rootDirString);
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new File(rootDirString, 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+
+   Assert.assertEquals("localState", 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT);
+   }
+
+   /**
+* This tests that the creation of {@link TaskManagerServices} 
correctly falls back to the first tmp directory of
+* the IOManager as default for the local state root directory.
+*/
+   @Test
+   public void testCreationFromConfigDefault() throws Exception {
+
+   final Configuration config = new Configuration();
+
+   final ResourceID tmResourceID = ResourceID.generate();
+
+   TaskManagerServicesConfiguration 
taskManagerServicesConfiguration =
+   
TaskManagerServicesConfiguration.fromConfiguration(config, 
InetAddress.getLocalHost(), true);
+
+   TaskManagerServices taskManagerServices =
+   
TaskManagerServices.fromConfiguration(taskManagerServicesConfiguration, 
tmResourceID);
+
+   TaskExecutorLocalStateStoresManager taskStateManager = 
taskManagerServices.getTaskStateManager();
+
+   Assert.assertEquals(
+   new 
File(taskManagerServicesConfiguration.getTmpDirPaths()[0], 
TaskManagerServices.LOCAL_STATE_SUB_DIRECTORY_ROOT),
+   taskStateManager.getLocalStateRootDirectory());
+   }
+
+   /**
+* This tests that the {@link TaskExecutorLocalStateStoresManager} 

[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364057#comment-16364057
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168177861
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskStateManagerImplTest.java
 ---
@@ -25,17 +25,24 @@
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.IOException;
 
 import static org.mockito.Mockito.mock;
 
 public class TaskStateManagerImplTest {
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168175822
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -95,15 +94,14 @@ public void writeRecord(OUT record) throws IOException {
throw new IOException("write record failed", exception);
}
 
-   Object[] fields = new Object[record.getArity()];
-   for (int i = 0; i < record.getArity(); i++) {
-   fields[i] = record.getField(i);
-   }
+   Object[] fields = extractFields(record);
ResultSetFuture result = 
session.executeAsync(prepared.bind(fields));
Futures.addCallback(result, callback);
}
 
-   /**
+   protected abstract Object[] extractFields(OUT record);
--- End diff --

Add an `Object[] fields` parameter that can be reused across invocations of 
`extractFields()`.


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168169354
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormat.java
 ---
@@ -37,11 +36,11 @@
 import java.io.IOException;
 
 /**
- * OutputFormat to write {@link org.apache.flink.api.java.tuple.Tuple} 
into Apache Cassandra.
+ * CassandraOutputFormat is the common abstract class for writing into 
Apache Cassandra.
  *
- * @param  type of Tuple
+ * @param  Type of the elements to write.
  */
-public class CassandraOutputFormat extends 
RichOutputFormat {
+public abstract class CassandraOutputFormat extends 
RichOutputFormat {
--- End diff --

Rename to `CassandraOutputFormatBase`


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168169778
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraTupleOutputFormat.java
 ---
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.connectors.cassandra;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder;
+
+/**
+ * OutputFormat to write Flink {@link Tuple}s into a Cassandra cluster.
+ *
+ * @param  Type of elements to write, it must extend {@link Tuple}.
+ */
+public class CassandraTupleOutputFormat extends 
CassandraOutputFormat {
--- End diff --

Create another class `CassandraOutputFormat` that extends this class and 
doesn't override anything.
This class is for backwards compatibility and should be deprecated.



---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168177338
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -457,8 +458,8 @@ public void testCassandraTableSink() throws Exception {
}
 
@Test
-   public void testCassandraBatchFormats() throws Exception {
-   OutputFormat> sink = new 
CassandraOutputFormat<>(injectTableName(INSERT_DATA_QUERY), builder);
+   public void testCassandraBatchTupleFormats() throws Exception {
--- End diff --

rename to `testCassandraBatchTupleFormat` (-s)


---


[GitHub] flink pull request #5272: [Flink-8397][Connectors]Support Row type for Cassa...

2018-02-14 Thread fhueske
Github user fhueske commented on a diff in the pull request:

https://github.com/apache/flink/pull/5272#discussion_r168177375
  
--- Diff: 
flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java
 ---
@@ -482,6 +483,23 @@ public void testCassandraBatchFormats() throws 
Exception {
Assert.assertEquals(20, result.size());
}
 
+   @Test
+   public void testCassandraBatchRowFormats() throws Exception {
--- End diff --

rename to `testCassandraBatchRowFormat` (-s)


---


[GitHub] flink pull request #5487: [FLINK-8656] [flip6] Add modify CLI command to res...

2018-02-14 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5487

[FLINK-8656] [flip6] Add modify CLI command to rescale Flink jobs

## What is the purpose of the change

Jobs can now be rescaled by calling flink modify  -p .
Internally, the CliFrontend will send the corresponding REST call and poll
for status updates.

This PR is based on #5454.

## Brief change log

- Add `modify` call to `CliFrontend`
- Add `ClusterClient#rescaleJob` method with default implementation
- Implement `RestClusterClient#rescalJob` method to trigger asynchronous 
rescale operation via REST and poll for its status updates

## Verifying this change

- Tested manually
- Added `CliFrontendModifyTest`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (yes)
  - If yes, how is the feature documented? (JavaDocs + stdout help)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink rescaleCommand

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5487.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5487


commit d9159228091cae9ebbd1bb718b69e6cf452881e1
Author: Till Rohrmann 
Date:   2018-02-13T11:41:44Z

[FLINK-8643] [flip6] Use JobManagerOptions#SLOT_REQUEST_TIMEOUT in 
ExecutionGraph

This commit changes the initialization of the ExecutionGraph to use the
JobManagerOptions#SLOT_REQUEST_TIMEOUT for the slot allocation. Furthermore,
it changes the behaviour of the SlotPool#ProviderAndOwner implementation 
such
that the timeout is given to it via the SlotProvider#allocateSlot call.

commit 19780c9d284914ec51e92231536315299a3c2da3
Author: Till Rohrmann 
Date:   2018-02-13T12:18:01Z

[hotfix] [flip6] Remove unnecessary timeout from SlotPool

commit 9924776c92a378cef144c0767f1ff18b799d52e9
Author: Till Rohrmann 
Date:   2018-02-13T14:33:11Z

[FLINK-8647] [flip6] Introduce JobMasterConfiguration

This commit introduces a JobMasterConfiguration which contains JobMaster 
specific
configuration settings.

commit fde75841de2e27cb7380f3a28066a99e2c1a690d
Author: zentol 
Date:   2018-01-23T12:50:32Z

[FLINK-8475][config][docs] Integrate HA-ZK options

This closes #5462.

commit 788a17fdbd4aaf3429ead4491ede197fc775b1f0
Author: zentol 
Date:   2018-01-23T13:04:36Z

[FLINK-8475][config][docs] Integrate YARN options

This closes #5463.

commit fcd783358c282e61bf12e0c18298c237c85a6695
Author: Till Rohrmann 
Date:   2018-02-13T15:08:38Z

[hotfix] [tests] Simplify JobMasterTest

commit 8206e6f13809c0b60bfaf776bc386088f535e723
Author: Till Rohrmann 
Date:   2018-02-13T15:10:09Z

[FLINK-8546] [flip6] Respect savepoints and restore from latest checkpoints

Let the JobMaster respect checkpoints and savepoints. The JobMaster will 
always
try to restore the latest checkpoint if there is one available. Next it 
will check
whether savepoint restore settings have been set. If so, then it will try 
to restore
the savepoint. Only if these settings are not set, the job will be started 
from
scratch.

commit 057a95b7328b1cca7b78bf1dd25e8d048df70410
Author: Till Rohrmann 
Date:   2018-02-13T15:11:37Z

[hotfix] Fix checkstyle violations in ExecutionGraph

commit 9930b0991320bcff268ca82db6378df8976560dc
Author: Till Rohrmann 
Date:   2018-02-13T15:12:41Z

[FLINK-8627] Introduce new JobStatus#SUSPENDING to ExecutionGraph

The new JobStatus#SUSPENDING says that an ExecutionGraph has been suspended 
but its
clean up has not been done yet. Only after all Executions have been 
canceled, the
ExecutionGraph will enter the SUSPENDED state and complete the termination 
future
accordingly.

commit 6c51ad306c90464572353168ecafdb962794747e
Author: Till Rohrmann 
Date:   2018-02-13T15:14:41Z

[FLINK-8629] [flip6] Allow JobMaster to rescale jobs

This commit adds the functionality to rescale a job or parts of it to
the JobMaster. In order to rescale a job, the JobMaster does the following:
1. Take a 

[jira] [Updated] (FLINK-8655) Add a default keyspace to CassandraSink

2018-02-14 Thread Christopher Hughes (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8655?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Christopher Hughes updated FLINK-8655:
--
Description: 
Currently, to use the CassandraPojoSink, it is necessary for a user to provide 
keyspace information on the desired POJOs using datastax annotations.  This 
allows various POJOs to be written to multiple keyspaces while sinking 
messages, but prevent runtime flexibility.

For many developers, non-production environments may all share a single 
Cassandra instance differentiated by keyspace names.  I propose adding a 
`defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
definitive keyspace would attempt to be loaded to the provided default.

  was:
Currently, to use the CassandraPojoSink, it is necessary for a user to provide 
keyspace information on the desired POJOs using datastax annotations.  This 
allows various POJOs to be written to multiple keyspaces while sinking 
messages, but prevent runtime flexibility.

For many developers, non-production environments may all share a single 
Cassandra instance differentiated by keyspace names.  I propose adding a 
`defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs that lack a 
keyspace would be piped to the default. 


> Add a default keyspace to CassandraSink
> ---
>
> Key: FLINK-8655
> URL: https://issues.apache.org/jira/browse/FLINK-8655
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.4.0
>Reporter: Christopher Hughes
>Priority: Minor
>  Labels: features
> Fix For: 1.4.1
>
>
> Currently, to use the CassandraPojoSink, it is necessary for a user to 
> provide keyspace information on the desired POJOs using datastax annotations. 
>  This allows various POJOs to be written to multiple keyspaces while sinking 
> messages, but prevent runtime flexibility.
> For many developers, non-production environments may all share a single 
> Cassandra instance differentiated by keyspace names.  I propose adding a 
> `defaultKeyspace(String keyspace)` to the ClusterBuilder.  POJOs lacking a 
> definitive keyspace would attempt to be loaded to the provided default.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-14 Thread Xingcan Cui (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363802#comment-16363802
 ] 

Xingcan Cui commented on FLINK-8538:


Hi [~twalthr], I think Friday will be fine. Thought looks awkward, I've 
transformed the scala codes to Java. Looking forward to your PR : )

> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API  SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8553) switch flink-metrics-datadog to async mode

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8553?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363832#comment-16363832
 ] 

ASF GitHub Bot commented on FLINK-8553:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5418
  
merging.


> switch flink-metrics-datadog to async mode
> --
>
> Key: FLINK-8553
> URL: https://issues.apache.org/jira/browse/FLINK-8553
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.4.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> Even though currently flink-metrics-datadog is designed as `fire-and-forget`, 
> it's still using sync calls which may block or slow down core. Need to switch 
> it to async mode.
> cc  [~Zentol]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-7286) Flink Dashboard fails to display bytes/records received by sources

2018-02-14 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7286?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-7286:

Component/s: Metrics

> Flink Dashboard fails to display bytes/records received by sources
> --
>
> Key: FLINK-7286
> URL: https://issues.apache.org/jira/browse/FLINK-7286
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics, Webfrontend
>Affects Versions: 1.3.1
>Reporter: Elias Levy
>Assignee: Hai Zhou UTC+8
>Priority: Major
> Fix For: 1.5.0
>
>
> It appears Flink can't measure the number of bytes read or records produced 
> by a source (e.g. Kafka source). This is particularly problematic for simple 
> jobs where the job pipeline is chained, and in which there are no 
> measurements between operators. Thus, in the UI it appears that the job is 
> not consuming any data.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5485: [FLINK-8411] Don't allow null in ListState.add()/a...

2018-02-14 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/5485

[FLINK-8411] Don't allow null in ListState.add()/addAll()

R: @StefanRRichter, @bowenli86 

It turns out that this is a bit trickier than assumed earlier: 
`ListState.addAll()` was not considered and also had inconsistent behaviour 
between state backends before.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-8411-fix-list-add

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5485.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5485


commit b39aa20d60df9effb536cc96c308645b9688113d
Author: Aljoscha Krettek 
Date:   2018-02-14T11:04:20Z

[FLINK-8411] Don't allow null in ListState.add()/addAll()




---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363984#comment-16363984
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169266
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
 ---
@@ -3404,12 +3405,16 @@ public String fold(String acc, Integer value) 
throws Exception {
}
}
 
-   protected KeyedStateHandle runSnapshot(RunnableFuture 
snapshotRunnableFuture) throws Exception {
+   protected KeyedStateHandle runSnapshot(
+   RunnableFuture 
snapshotRunnableFuture) throws Exception {
+
if(!snapshotRunnableFuture.isDone()) {
Thread runner = new Thread(snapshotRunnableFuture);
runner.start();
}
-   return snapshotRunnableFuture.get();
+
+   SnapshotResult snapshotResult = 
snapshotRunnableFuture.get();
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 ---
@@ -47,10 +47,11 @@
 * @param checkpointMetrics task level metrics for the checkpoint.
 * @param acknowledgedState the reported states from the owning task.
--- End diff --

👍 


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169149
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
--- End diff --

Already done in a later commit.


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169168
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -916,57 +916,78 @@ private void handleExecutionException(Exception e) {

CheckpointingOperation.AsynCheckpointState.COMPLETED,

CheckpointingOperation.AsynCheckpointState.RUNNING);
 
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
+   if (asyncCheckpointState.compareAndSet(
+   
CheckpointingOperation.AsynCheckpointState.RUNNING,
+   
CheckpointingOperation.AsynCheckpointState.DISCARDED)) {
--- End diff --

👍 


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363980#comment-16363980
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169149
  
--- Diff: 
flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
 ---
@@ -227,21 +229,13 @@ protected void cleanup() throws Exception {
// has been stopped
CLEANUP_LATCH.trigger();
 
-   // wait until handle async exception has been called to 
proceed with the termination of the
-   // StreamTask
-   HANDLE_ASYNC_EXCEPTION_LATCH.await();
--- End diff --

Already done in a later commit.


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363983#comment-16363983
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169207
  
--- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ---
@@ -859,56 +861,77 @@ public void run() {
if 
(asyncCheckpointState.compareAndSet(CheckpointingOperation.AsynCheckpointState.RUNNING,

CheckpointingOperation.AsynCheckpointState.COMPLETED)) {
 
-   TaskStateSnapshot acknowledgedState = 
hasState ? taskOperatorSubtaskStates : null;
-
-   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
-
-   // we signal stateless tasks by 
reporting null, so that there are no attempts to assign empty state
-   // to stateless tasks on restore. This 
enables simple job modifications that only concern
-   // stateless without the need to assign 
them uids to match their (always empty) states.
-   
taskStateManager.reportTaskStateSnapshot(
-   checkpointMetaData,
-   checkpointMetrics,
-   acknowledgedState);
-
-   LOG.debug("{} - finished asynchronous 
part of checkpoint {}. Asynchronous duration: {} ms",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), asyncDurationMillis);
-
-   LOG.trace("{} - reported the following 
states in snapshot for checkpoint {}: {}.",
-   owner.getName(), 
checkpointMetaData.getCheckpointId(), acknowledgedState);
+   reportCompletedSnapshotStates(
+   
jobManagerTaskOperatorSubtaskStates,
+   localTaskOperatorSubtaskStates,
+   asyncDurationMillis);
 
} else {
LOG.debug("{} - asynchronous part of 
checkpoint {} could not be completed because it was closed before.",
owner.getName(),

checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
-   // the state is completed if an exception 
occurred in the acknowledgeCheckpoint call
-   // in order to clean up, we have to set it to 
RUNNING again.
-   asyncCheckpointState.compareAndSet(
-   
CheckpointingOperation.AsynCheckpointState.COMPLETED,
-   
CheckpointingOperation.AsynCheckpointState.RUNNING);
-
-   try {
-   cleanup();
-   } catch (Exception cleanupException) {
-   e.addSuppressed(cleanupException);
-   }
-
-   Exception checkpointException = new Exception(
-   "Could not materialize checkpoint " + 
checkpointId + " for operator " +
-   owner.getName() + '.',
-   e);
-
-   
owner.asynchronousCheckpointExceptionHandler.tryHandleCheckpointException(
-   checkpointMetaData,
-   checkpointException);
+   handleExecutionException(e);
} finally {
owner.cancelables.unregisterCloseable(this);

FileSystemSafetyNet.closeSafetyNetAndGuardedResourcesForThread();
}
}
 
+   private void reportCompletedSnapshotStates(
+   TaskStateSnapshot acknowledgedTaskStateSnapshot,
+   TaskStateSnapshot localTaskStateSnapshot,
+   long asyncDurationMillis) {
+
+   TaskStateManager taskStateManager = 
owner.getEnvironment().getTaskStateManager();
+
+   boolean hasAckState = 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169451
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 ---
@@ -109,14 +109,20 @@ public OperatorStateCheckpointOutputStream 
getRawOperatorStateOutput() throws Ex
return operatorStateCheckpointOutputStream;
}
 
-   public RunnableFuture getKeyedStateStreamFuture() 
throws IOException {
-   KeyGroupsStateHandle keyGroupsStateHandle = 
closeAndUnregisterStreamToObtainStateHandle(keyedStateCheckpointOutputStream);
-   return new DoneFuture(keyGroupsStateHandle);
+   public RunnableFuture 
getKeyedStateStreamFuture() throws IOException {
+   return 
getGenericStateStreamFuture(keyedStateCheckpointOutputStream);
}
 
-   public RunnableFuture 
getOperatorStateStreamFuture() throws IOException {
-   OperatorStateHandle operatorStateHandle = 
closeAndUnregisterStreamToObtainStateHandle(operatorStateCheckpointOutputStream);
-   return new DoneFuture<>(operatorStateHandle);
+   public RunnableFuture 
getOperatorStateStreamFuture() throws IOException {
+   return 
getGenericStateStreamFuture(operatorStateCheckpointOutputStream);
+   }
+
+   private  RunnableFuture 
getGenericStateStreamFuture(
+   NonClosingCheckpointOutputStream stream) throws 
IOException {
+   T operatorStateHandle = (T) 
closeAndUnregisterStreamToObtainStateHandle(stream);
--- End diff --

I solved it slightly different. The change you proposed would cascade into 
other methods and fields.


---


[jira] [Commented] (FLINK-8360) Implement task-local state recovery

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16363985#comment-16363985
 ] 

ASF GitHub Bot commented on FLINK-8360:
---

Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169295
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/TaskStateManager.java
 ---
@@ -47,10 +47,11 @@
 * @param checkpointMetrics task level metrics for the checkpoint.
 * @param acknowledgedState the reported states from the owning task.
--- End diff --

 


> Implement task-local state recovery
> ---
>
> Key: FLINK-8360
> URL: https://issues.apache.org/jira/browse/FLINK-8360
> Project: Flink
>  Issue Type: New Feature
>  Components: State Backends, Checkpointing
>Reporter: Stefan Richter
>Assignee: Stefan Richter
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue tracks the development of recovery from task-local state. The main 
> idea is to have a secondary, local copy of the checkpointed state, while 
> there is still a primary copy in DFS that we report to the checkpoint 
> coordinator.
> Recovery can attempt to restore from the secondary local copy, if available, 
> to save network bandwidth. This requires that the assignment from tasks to 
> slots is as sticky is possible.
> For starters, we will implement this feature for all managed keyed states and 
> can easily enhance it to all other state types (e.g. operator state) later.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-14 Thread StefanRRichter
Github user StefanRRichter commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168169484
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/state/StateSnapshotContextSynchronousImpl.java
 ---
@@ -130,7 +136,7 @@ public OperatorStateCheckpointOutputStream 
getRawOperatorStateOutput() throws Ex
}
 
private  void closeAndUnregisterStream(
-   NonClosingCheckpointOutputStream stream) throws IOException {
+   NonClosingCheckpointOutputStream stream) throws 
IOException {
--- End diff --

👍 


---


[GitHub] flink pull request #5478: [FLINK-8647] [flip6] Introduce JobMasterConfigurat...

2018-02-14 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5478#discussion_r168171127
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 ---
@@ -698,7 +707,7 @@ else if (numConsumers == 0) {
executor);
 
// double check to resolve race conditions
-   if(consumerVertex.getExecutionState() == 
RUNNING){
+   if (consumerVertex.getExecutionState() == 
RUNNING){
consumerVertex.sendPartitionInfos();
--- End diff --

Good catch @zhangminglei. Will fix it.


---


[GitHub] flink pull request #5282: [FLINK-6352] [kafka] Timestamp-based offset config...

2018-02-14 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r167941419
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
+* from a checkpoint or savepoint. When the consumer is restored from a 
checkpoint or
+* savepoint, only the offsets in the restored state will be used.
+*
+* @return The consumer object, to allow function chaining.
+*/
+   // NOTE -
+   // This method is implemented in the base class because this is where 
the startup logging and verifications live.
+   // However, it is not publicly exposed since only newer Kafka versions 
support the functionality.
+   // Version-specific subclasses which can expose the functionality 
should override and allow public access.
+   protected FlinkKafkaConsumerBase setStartFromTimestamp(long 
startupOffsetsTimestamp) {
+   checkNotNull(startupOffsetsTimestamp, 
"startupOffsetsTimestamp");
--- End diff --

I think the error message might not be helpful.


---


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364022#comment-16364022
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r167941085
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -343,6 +348,39 @@ public FlinkKafkaConsumerBase(
 */
public FlinkKafkaConsumerBase setStartFromLatest() {
this.startupMode = StartupMode.LATEST;
+   this.startupOffsetsTimestamp = null;
+   this.specificStartupOffsets = null;
+   return this;
+   }
+
+   /**
+* Specifies the consumer to start reading partitions from a specified 
timestamp.
+* The specified timestamp must be before the current timestamp.
+* This lets the consumer ignore any committed group offsets in 
Zookeeper / Kafka brokers.
+*
+* The consumer will look up the earliest offset whose timestamp is 
greater than or equal
+* to the specific timestamp from Kafka. If there's no such offset, the 
consumer will use the
+* latest offset to read data from kafka.
+*
+* This method does not effect where partitions are read from when 
the consumer is restored
--- End diff --

typo: effect -> affect


> FlinkKafkaConsumer should support to use timestamp to set up start offset
> -
>
> Key: FLINK-6352
> URL: https://issues.apache.org/jira/browse/FLINK-6352
> Project: Flink
>  Issue Type: Improvement
>  Components: Kafka Connector
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Blocker
> Fix For: 1.5.0
>
>
> Currently "auto.offset.reset" is used to initialize the start offset of 
> FlinkKafkaConsumer, and the value should be earliest/latest/none. This method 
> can only let the job comsume the beginning or the most recent data, but can 
> not specify the specific offset of Kafka began to consume. 
> So, there should be a configuration item (such as 
> "flink.source.start.time" and the format is "-MM-dd HH:mm:ss") that 
> allows user to configure the initial offset of Kafka. The action of 
> "flink.source.start.time" is as follows:
> 1) job start from checkpoint / savepoint
>   a> offset of partition can be restored from checkpoint/savepoint,  
> "flink.source.start.time" will be ignored.
>   b> there's no checkpoint/savepoint for the partition (For example, this 
> partition is newly increased), the "flink.kafka.start.time" will be used to 
> initialize the offset of the partition
> 2) job has no checkpoint / savepoint, the "flink.source.start.time" is used 
> to initialize the offset of the kafka
>   a> the "flink.source.start.time" is valid, use it to set the offset of kafka
>   b> the "flink.source.start.time" is out-of-range, the same as it does 
> currently with no initial offset, get kafka's current offset and start reading



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-6352) FlinkKafkaConsumer should support to use timestamp to set up start offset

2018-02-14 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6352?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16364025#comment-16364025
 ] 

ASF GitHub Bot commented on FLINK-6352:
---

Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/5282#discussion_r168169777
  
--- Diff: 
flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBase.java
 ---
@@ -441,28 +481,57 @@ public void open(Configuration configuration) throws 
Exception {
getRuntimeContext().getIndexOfThisSubtask(), 
subscribedPartitionsToStartOffsets.size(), subscribedPartitionsToStartOffsets);
} else {
// use the partition discoverer to fetch the initial 
seed partitions,
-   // and set their initial offsets depending on the 
startup mode
-   for (KafkaTopicPartition seedPartition : allPartitions) 
{
-   if (startupMode != 
StartupMode.SPECIFIC_OFFSETS) {
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
startupMode.getStateSentinel());
-   } else {
+   // and set their initial offsets depending on the 
startup mode.
+   // for SPECIFIC_OFFSETS and TIMESTAMP modes, we set the 
specific offsets now;
+   // for other modes (EARLIEST, LATEST, and 
GROUP_OFFSETS), the offset is lazily determined
+   // when the partition is actually read.
+   switch (startupMode) {
+   case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new 
IllegalArgumentException(
"Startup mode for the 
consumer set to " + StartupMode.SPECIFIC_OFFSETS +
-   ", but no 
specific offsets were specified");
+   ", but no 
specific offsets were specified.");
}
 
-   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
-   if (specificOffset != null) {
-   // since the specified offsets 
represent the next record to read, we subtract
-   // it by one so that the 
initial state of the consumer will be correct
-   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
-   } else {
-   // default to group offset 
behaviour if the user-provided specific offsets
-   // do not contain a value for 
this partition
-   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   for (KafkaTopicPartition seedPartition 
: allPartitions) {
+   Long specificOffset = 
specificStartupOffsets.get(seedPartition);
+   if (specificOffset != null) {
+   // since the specified 
offsets represent the next record to read, we subtract
+   // it by one so that 
the initial state of the consumer will be correct
+   
subscribedPartitionsToStartOffsets.put(seedPartition, specificOffset - 1);
+   } else {
+   // default to group 
offset behaviour if the user-provided specific offsets
+   // do not contain a 
value for this partition
+   
subscribedPartitionsToStartOffsets.put(seedPartition, 
KafkaTopicPartitionStateSentinel.GROUP_OFFSET);
+   }
+   }
+
+   break;
+   case TIMESTAMP:
+   if (startupOffsetsTimestamp == null) {
+   throw new 
IllegalArgumentException(
--- End diff --

Maybe this should be an `IllegalStateException`. The existing code also 
uses `IllegalArgumentException` but were quite a bit removed from the actual 
point 

  1   2   3   4   5   >