[jira] [Updated] (FLINK-8274) Fix Java 64K method compiling limitation for CommonCalc

2018-02-17 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8274?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler updated FLINK-8274:

Affects Version/s: 1.5.0

> Fix Java 64K method compiling limitation for CommonCalc
> ---
>
> Key: FLINK-8274
> URL: https://issues.apache.org/jira/browse/FLINK-8274
> Project: Flink
>  Issue Type: Improvement
>  Components: Table API & SQL
>Affects Versions: 1.5.0
>Reporter: Ruidong Li
>Assignee: Ruidong Li
>Priority: Major
>
> For complex SQL Queries, the generated code for {code}DataStreamCalc{code}, 
> {code}DataSetCalc{code} may exceed Java's method length limitation 64kb.
>  
> This issue will split long method to several sub method calls.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8685) Code of method "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V" of class "DataStrea

2018-02-17 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8685?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-8685.
---
   Resolution: Duplicate
Fix Version/s: 1.5.0

Thank your for reporting this issue. We already have an issue file for this 
issue (see FLINK-8274), so I'm closing this as a duplicate.

> Code of method 
> "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
>  of class "DataStreamCalcRule$3069" grows beyond 64 KB
> -
>
> Key: FLINK-8685
> URL: https://issues.apache.org/jira/browse/FLINK-8685
> Project: Flink
>  Issue Type: Bug
>  Components: DataStream API, Table API & SQL
> Environment: Fedora 27
>Reporter: Jahandar Musayev
>Priority: Blocker
> Fix For: 1.5.0
>
>
> I want to use DataStream API and Table API & SQL. I want to read data from 
> Apache Kafka and transpose it using SQL. It throws the error below.
> A version of this code for DataSet API works fine.
>  
> {noformat}
> org.apache.flink.api.common.InvalidProgramException: Table program cannot be 
> compiled. This is a bug. Please file an issue.
>     at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:36)
>     at 
> org.apache.flink.table.runtime.CRowProcessRunner.compile(CRowProcessRunner.scala:35)
>     at 
> org.apache.flink.table.runtime.CRowProcessRunner.open(CRowProcessRunner.scala:49)
>     at 
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
>     at 
> org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
>     at 
> org.apache.flink.streaming.api.operators.ProcessOperator.open(ProcessOperator.java:56)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:393)
>     at 
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:254)
>     at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
>     at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Compiling "DataStreamCalcRule$3069": 
> Code of method 
> "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
>  of class "DataStreamCalcRule$3069" grows beyond 64 KB
>     at org.codehaus.janino.UnitCompiler.compileUnit(UnitCompiler.java:361)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:234)
>     at 
> org.codehaus.janino.SimpleCompiler.compileToClassLoader(SimpleCompiler.java:446)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:213)
>     at org.codehaus.janino.SimpleCompiler.cook(SimpleCompiler.java:204)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:80)
>     at org.codehaus.commons.compiler.Cookable.cook(Cookable.java:75)
>     at 
> org.apache.flink.table.codegen.Compiler$class.compile(Compiler.scala:33)
>     ... 9 more
> Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method 
> "processElement(Ljava/lang/Object;Lorg/apache/flink/streaming/api/functions/ProcessFunction$Context;Lorg/apache/flink/util/Collector;)V"
>  of class "DataStreamCalcRule$3069" grows beyond 64 KB
>     at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:974)
>     at org.codehaus.janino.CodeContext.write(CodeContext.java:867)
>     at org.codehaus.janino.UnitCompiler.writeOpcode(UnitCompiler.java:11753)
>     at org.codehaus.janino.UnitCompiler.writeLdc(UnitCompiler.java:10512)
>     at org.codehaus.janino.UnitCompiler.pushConstant(UnitCompiler.java:10280)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:5202)
>     at org.codehaus.janino.UnitCompiler.access$8400(UnitCompiler.java:212)
>     at 
> org.codehaus.janino.UnitCompiler$12.visitIntegerLiteral(UnitCompiler.java:4073)
>     at 
> org.codehaus.janino.UnitCompiler$12.visitIntegerLiteral(UnitCompiler.java:4044)
>     at org.codehaus.janino.Java$IntegerLiteral.accept(Java.java:5250)
>     at org.codehaus.janino.UnitCompiler.compileGet(UnitCompiler.java:4044)
>     at org.codehaus.janino.UnitCompiler.fakeCompile(UnitCompiler.java:3383)
>     at 
> org.codehaus.janino.UnitCompiler.compileGetValue(UnitCompiler.java:5218)
>     at org.codehaus.janino.UnitCompiler.compileGet2(UnitCompiler.java:4813)
>     at org.codehaus.janino.UnitCompiler.access$8200(UnitCompiler.java:212)
>     at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4071)
>     at 
> org.codehaus.janino.UnitCompiler$12.visitMethodInvocation(UnitCompiler.java:4044)
>

[GitHub] flink issue #5500: [FLINK-8667] expose key in KeyedBroadcastProcessFunction#...

2018-02-17 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@pnowojski @aljoscha  I updated the code. Hopefully we can make this into 
1.5.0! Thanks!


---


[jira] [Commented] (FLINK-8667) expose key in KeyedBroadcastProcessFunction#onTimer()

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8667?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368139#comment-16368139
 ] 

ASF GitHub Bot commented on FLINK-8667:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/5500
  
@pnowojski @aljoscha  I updated the code. Hopefully we can make this into 
1.5.0! Thanks!


> expose key in KeyedBroadcastProcessFunction#onTimer()
> -
>
> Key: FLINK-8667
> URL: https://issues.apache.org/jira/browse/FLINK-8667
> Project: Flink
>  Issue Type: Improvement
>Affects Versions: 1.5.0
>Reporter: Bowen Li
>Assignee: Bowen Li
>Priority: Major
> Fix For: 1.5.0
>
>
> [~aljoscha] [~pnowojski]  
> Since KeyedBroadcastProcessFunction is about to get out of the door, I think 
> it will be great to expose the timer's key in KeyedBroadcastProcessFunction 
> too. If we don't do it now, it will be much more difficult to add the feature 
> on later because of user app compatibility issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5501: [FLINK-6053][metrics] Add new Number-/StringGauge metric ...

2018-02-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5501
  
Yeah I'm not happy that we now added another instanceof clause.

Ultimately we could modify `Counter` and `Meter` to implement `NumberGauge` 
(returning the count and rate, respectively), but that still leaves us with 3 
metric types (StringGauge, NumberGauge, Histogram).
(Long-term i would still like to throw out StringGauges, or force reporters 
to implement a dedicated interface)

We could also hide histograms behind a series of `NumberGauges`, but it is 
difficult to ensure that all gauges refer to the same `HistogramStatistics`, 
i.e. are consistent as a whole, as they do currently.

I didn't do anything in that regard in this PR as it would change behavior 
of some reporters that try to map our types to whatever types the backend uses. 
But this is very problematic anyway as our metric types are mostly syntactic 
sugar; the choice between counter/gauge is rather arbitrary.



---


[jira] [Commented] (FLINK-6053) Gauge should only take subclasses of Number, rather than everything

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6053?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368140#comment-16368140
 ] 

ASF GitHub Bot commented on FLINK-6053:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5501
  
Yeah I'm not happy that we now added another instanceof clause.

Ultimately we could modify `Counter` and `Meter` to implement `NumberGauge` 
(returning the count and rate, respectively), but that still leaves us with 3 
metric types (StringGauge, NumberGauge, Histogram).
(Long-term i would still like to throw out StringGauges, or force reporters 
to implement a dedicated interface)

We could also hide histograms behind a series of `NumberGauges`, but it is 
difficult to ensure that all gauges refer to the same `HistogramStatistics`, 
i.e. are consistent as a whole, as they do currently.

I didn't do anything in that regard in this PR as it would change behavior 
of some reporters that try to map our types to whatever types the backend uses. 
But this is very problematic anyway as our metric types are mostly syntactic 
sugar; the choice between counter/gauge is rather arbitrary.



> Gauge should only take subclasses of Number, rather than everything
> --
>
> Key: FLINK-6053
> URL: https://issues.apache.org/jira/browse/FLINK-6053
> Project: Flink
>  Issue Type: Improvement
>  Components: Metrics
>Affects Versions: 1.2.0
>Reporter: Bowen Li
>Assignee: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, Flink's Gauge is defined as 
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```
> But it doesn't make sense to have Gauge take generic types other than Number. 
> And it blocks I from finishing FLINK-6013, because I cannot assume Gauge is 
> only about Number. So the class should be like
> ```java
> public interface Gauge extends Metric {
>   T getValue();
> }
> ```



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368141#comment-16368141
 ] 

ASF GitHub Bot commented on FLINK-8607:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5441
  
Thank you @fhueske. I will merge this now and open some follow-up issues.


> Add a basic embedded SQL CLI client
> ---
>
> Key: FLINK-8607
> URL: https://issues.apache.org/jira/browse/FLINK-8607
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
>
> This issue describes the Implementation Plan 1 of FLIP-24.
> Goal: Add the basic features to play around with Flink's streaming SQL.
> {code}
> - Add CLI component that reads the configuration files
> - "Pre-registered table sources"
> - "Job parameters"
> - Add executor for retrieving pre-flight information and corresponding CLI 
> SQL parser
> - SHOW TABLES
> - DESCRIBE TABLE
> - EXPLAIN
> - Add streaming append query submission to executor
> - Submit jars and run SELECT query using the ClusterClient
> - Collect results on heap and serve them on the CLI side (Internal Mode with 
> SELECT)
> - SOURCE (for executing a SQL statement stored in a local file)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI client

2018-02-17 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5441
  
Thank you @fhueske. I will merge this now and open some follow-up issues.


---


[GitHub] flink pull request #5441: [FLINK-8607] [table] Add a basic embedded SQL CLI ...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5441


---


[jira] [Resolved] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-17 Thread Timo Walther (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8607?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Timo Walther resolved FLINK-8607.
-
   Resolution: Fixed
Fix Version/s: 1.5.0

Fixed in 1.5.0: 035053cfe1319fb1be7a0b5163d1e786d4815a61

> Add a basic embedded SQL CLI client
> ---
>
> Key: FLINK-8607
> URL: https://issues.apache.org/jira/browse/FLINK-8607
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes the Implementation Plan 1 of FLIP-24.
> Goal: Add the basic features to play around with Flink's streaming SQL.
> {code}
> - Add CLI component that reads the configuration files
> - "Pre-registered table sources"
> - "Job parameters"
> - Add executor for retrieving pre-flight information and corresponding CLI 
> SQL parser
> - SHOW TABLES
> - DESCRIBE TABLE
> - EXPLAIN
> - Add streaming append query submission to executor
> - Submit jars and run SELECT query using the ClusterClient
> - Collect results on heap and serve them on the CLI side (Internal Mode with 
> SELECT)
> - SOURCE (for executing a SQL statement stored in a local file)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8607) Add a basic embedded SQL CLI client

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8607?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368144#comment-16368144
 ] 

ASF GitHub Bot commented on FLINK-8607:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5441


> Add a basic embedded SQL CLI client
> ---
>
> Key: FLINK-8607
> URL: https://issues.apache.org/jira/browse/FLINK-8607
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Timo Walther
>Priority: Major
> Fix For: 1.5.0
>
>
> This issue describes the Implementation Plan 1 of FLIP-24.
> Goal: Add the basic features to play around with Flink's streaming SQL.
> {code}
> - Add CLI component that reads the configuration files
> - "Pre-registered table sources"
> - "Job parameters"
> - Add executor for retrieving pre-flight information and corresponding CLI 
> SQL parser
> - SHOW TABLES
> - DESCRIBE TABLE
> - EXPLAIN
> - Add streaming append query submission to executor
> - Submit jars and run SELECT query using the ClusterClient
> - Collect results on heap and serve them on the CLI side (Internal Mode with 
> SELECT)
> - SOURCE (for executing a SQL statement stored in a local file)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8686) Improve basic embedded SQL client

2018-02-17 Thread Timo Walther (JIRA)
Timo Walther created FLINK-8686:
---

 Summary: Improve basic embedded SQL client 
 Key: FLINK-8686
 URL: https://issues.apache.org/jira/browse/FLINK-8686
 Project: Flink
  Issue Type: Improvement
  Components: Table API & SQL
Reporter: Timo Walther
Assignee: Timo Walther


This issue describes follow-up issues that should be fixes in order to make the 
SQL client more stable:

- Add more tests for executor
- Configure JVM heap size
- Limit changelog and table buffers
- "The input is invalid please check it again." => add allowed range
- Load dependencies recursively
- Cache table & environments in executor
- Clean up results in result store
- Improve error message for unsupported batch queries



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5505: [FLINK-8538][table]Add a Kafka table source factory with ...

2018-02-17 Thread twalthr
Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5505
  
Thanks for the PR @xccui. You are right, we should provide more utility 
method in `DescriptorProperties`. Especially for improving the interoperability 
Scala<->Java. I will fix those weaknesses next week and see if I can simply the 
logic in your PR.


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368151#comment-16368151
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user twalthr commented on the issue:

https://github.com/apache/flink/pull/5505
  
Thanks for the PR @xccui. You are right, we should provide more utility 
method in `DescriptorProperties`. Especially for improving the interoperability 
Scala<->Java. I will fix those weaknesses next week and see if I can simply the 
logic in your PR.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5505: [FLINK-8538][table]Add a Kafka table source factory with ...

2018-02-17 Thread xccui
Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5505
  
Yes @twalthr, that will be great! I'll start working on the 
`KafkaAvroTableSourceFactory` and keep an eye on the API refactorings.


---


[jira] [Commented] (FLINK-8538) Add a Kafka table source factory with JSON format support

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368158#comment-16368158
 ] 

ASF GitHub Bot commented on FLINK-8538:
---

Github user xccui commented on the issue:

https://github.com/apache/flink/pull/5505
  
Yes @twalthr, that will be great! I'll start working on the 
`KafkaAvroTableSourceFactory` and keep an eye on the API refactorings.


> Add a Kafka table source factory with JSON format support
> -
>
> Key: FLINK-8538
> URL: https://issues.apache.org/jira/browse/FLINK-8538
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Reporter: Timo Walther
>Assignee: Xingcan Cui
>Priority: Major
>
> Similar to CSVTableSourceFactory a Kafka table source factory for JSON should 
> be added. This issue includes improving the existing JSON descriptor with 
> validation that can be used for other connectors as well. It is up for 
> discussion if we want to split the KafkaJsonTableSource into connector and 
> format such that we can reuse the format for other table sources as well.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8073) Test instability FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()

2018-02-17 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368165#comment-16368165
 ] 

Till Rohrmann commented on FLINK-8073:
--

Another instance: https://api.travis-ci.org/v3/job/342673340/log.txt

> Test instability 
> FlinkKafkaProducer011ITCase.testScaleDownBeforeFirstCheckpoint()
> -
>
> Key: FLINK-8073
> URL: https://issues.apache.org/jira/browse/FLINK-8073
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector, Tests
>Affects Versions: 1.4.0, 1.5.0
>Reporter: Kostas Kloudas
>Priority: Critical
>  Labels: test-stability
>
> Travis log: https://travis-ci.org/kl0u/flink/jobs/301985988



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8662) FutureUtilsTest#testRetryWithDelay unstable

2018-02-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8662.

Resolution: Fixed

Fixed via 1809e9ff30080f5b65987bcd684d1b5e1056693f

> FutureUtilsTest#testRetryWithDelay unstable
> ---
>
> Key: FLINK-8662
> URL: https://issues.apache.org/jira/browse/FLINK-8662
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.5.0
>
>
> The {{FutureUtilsTes#testRetryWithDelay}} is unstable on Travis:
> [https://travis-ci.org/apache/flink/jobs/341773028]
>  
> The problem seems to be that we measure the time in the test case incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-8612) Add non-detached job mode

2018-02-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann closed FLINK-8612.

Resolution: Fixed

Fixed via 0e87e485ba14fc0c8a5f5aa96c79b45ef7fc45d1

> Add non-detached job mode
> -
>
> Key: FLINK-8612
> URL: https://issues.apache.org/jira/browse/FLINK-8612
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to support the non-detached job mode, the {{MiniDispatcher}} has to 
> wait until it has served the {{JobResult}} of a completed job at least once 
> before it terminates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5209: [FLINK-7711] Port JarListHandler to WebMonitorEndp...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5209


---


[GitHub] flink pull request #5035: [FLINK-7857][flp6] Port JobVertexDetails to REST e...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5035


---


[GitHub] flink pull request #5435: [FLINK-8612] [flip6] Enable non-detached job mode

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5435


---


[GitHub] flink pull request #5455: [FLINK-7711][flip6] Port JarListHandler

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5455


---


[GitHub] flink pull request #5493: [FLINK-7857][flip6] Port JobVertexDetails to REST ...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5493


---


[GitHub] flink pull request #5494: [FLINK-8662] [tests] Harden FutureUtilsTest#testRe...

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5494


---


[jira] [Resolved] (FLINK-7857) Port JobVertexDetails to REST endpoint

2018-02-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7857?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7857.
--
Resolution: Fixed

Fixed via 

2c027861086ee8be8cc56d6bb424b687ba3f7ffb

16451a38fdc206dea74f255f4fde6f8d8b3cba17

> Port JobVertexDetails to REST endpoint
> --
>
> Key: FLINK-7857
> URL: https://issues.apache.org/jira/browse/FLINK-7857
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port JobVertexDetails to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8612) Add non-detached job mode

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8612?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368172#comment-16368172
 ] 

ASF GitHub Bot commented on FLINK-8612:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5435


> Add non-detached job mode
> -
>
> Key: FLINK-8612
> URL: https://issues.apache.org/jira/browse/FLINK-8612
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to support the non-detached job mode, the {{MiniDispatcher}} has to 
> wait until it has served the {{JobResult}} of a completed job at least once 
> before it terminates.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368171#comment-16368171
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5455


> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7857) Port JobVertexDetails to REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368173#comment-16368173
 ] 

ASF GitHub Bot commented on FLINK-7857:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5035


> Port JobVertexDetails to REST endpoint
> --
>
> Key: FLINK-7857
> URL: https://issues.apache.org/jira/browse/FLINK-7857
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port JobVertexDetails to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7857) Port JobVertexDetails to REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368174#comment-16368174
 ] 

ASF GitHub Bot commented on FLINK-7857:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5493


> Port JobVertexDetails to REST endpoint
> --
>
> Key: FLINK-7857
> URL: https://issues.apache.org/jira/browse/FLINK-7857
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.5.0
>Reporter: Fang Yong
>Assignee: Fang Yong
>Priority: Major
>
> Port JobVertexDetails to REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8662) FutureUtilsTest#testRetryWithDelay unstable

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368175#comment-16368175
 ] 

ASF GitHub Bot commented on FLINK-8662:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5494


> FutureUtilsTest#testRetryWithDelay unstable
> ---
>
> Key: FLINK-8662
> URL: https://issues.apache.org/jira/browse/FLINK-8662
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Critical
> Fix For: 1.5.0
>
>
> The {{FutureUtilsTes#testRetryWithDelay}} is unstable on Travis:
> [https://travis-ci.org/apache/flink/jobs/341773028]
>  
> The problem seems to be that we measure the time in the test case incorrectly.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-17 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann resolved FLINK-7711.
--
Resolution: Fixed

Fixed via 4e6d22b2135216d1fab97477a6058faf1f84b789

> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7711) Port JarListHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7711?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368170#comment-16368170
 ] 

ASF GitHub Bot commented on FLINK-7711:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5209


> Port JarListHandler to new REST endpoint
> 
>
> Key: FLINK-7711
> URL: https://issues.apache.org/jira/browse/FLINK-7711
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarListHandler}} to new REST endpoint



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5516: [FLINK-8544] [Kafka Connector] Handle null message...

2018-02-17 Thread BillLeecn
GitHub user BillLeecn opened a pull request:

https://github.com/apache/flink/pull/5516

[FLINK-8544] [Kafka Connector] Handle null message key in 
JSONKeyValueDeserializationSc…

## What is the purpose of the change

This pull request fix a NPE thrown in JSONKeyValueDeserializationSchema 
when the message key is null, allowing JSONKeyValueDeserializationSchema to be 
used to retrieve message metadata even if the message key is null.

The NPE is caused by deserializing the message key without verification.

## Brief change log

  - Check nullity before deserializing the message key.

## Verifying this change

This change added tests and can be verified as follows:

  - Added unit tests for deserializing a message with null key.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BillLeecn/flink flink-8544

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5516.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5516


commit 385fcb60e0f9dc58523dfd54e7b3b1a50373afcf
Author: Bill Lee 
Date:   2018-02-17T09:50:16Z

[FLINK-8544] Handle null message key in JSONKeyValueDeserializationSchema




---


[jira] [Commented] (FLINK-8544) JSONKeyValueDeserializationSchema throws NPE when message key is null

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368176#comment-16368176
 ] 

ASF GitHub Bot commented on FLINK-8544:
---

GitHub user BillLeecn opened a pull request:

https://github.com/apache/flink/pull/5516

[FLINK-8544] [Kafka Connector] Handle null message key in 
JSONKeyValueDeserializationSc…

## What is the purpose of the change

This pull request fix a NPE thrown in JSONKeyValueDeserializationSchema 
when the message key is null, allowing JSONKeyValueDeserializationSchema to be 
used to retrieve message metadata even if the message key is null.

The NPE is caused by deserializing the message key without verification.

## Brief change log

  - Check nullity before deserializing the message key.

## Verifying this change

This change added tests and can be verified as follows:

  - Added unit tests for deserializing a message with null key.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): yes
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no
## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/BillLeecn/flink flink-8544

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5516.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5516


commit 385fcb60e0f9dc58523dfd54e7b3b1a50373afcf
Author: Bill Lee 
Date:   2018-02-17T09:50:16Z

[FLINK-8544] Handle null message key in JSONKeyValueDeserializationSchema




> JSONKeyValueDeserializationSchema throws NPE when message key is null
> -
>
> Key: FLINK-8544
> URL: https://issues.apache.org/jira/browse/FLINK-8544
> Project: Flink
>  Issue Type: Bug
>  Components: Kafka Connector
>Affects Versions: 1.4.0
>Reporter: Bill Lee
>Priority: Major
>   Original Estimate: 1h
>  Remaining Estimate: 1h
>
> JSONKeyValueDeserializationSchema call Jaskon to deserialize the message key 
> without validation.
>  If a message with key == null is read, flink throws an NPE.
> {code:java}
>   @Override
>   public ObjectNode deserialize(byte[] messageKey, byte[] message, String 
> topic, int partition, long offset) throws IOException {
>   if (mapper == null) {
>   mapper = new ObjectMapper();
>   }
>   ObjectNode node = mapper.createObjectNode();
>   node.set("key", mapper.readValue(messageKey, JsonNode.class)); 
> // messageKey is not validate against null.
>   node.set("value", mapper.readValue(message, JsonNode.class));
> {code}
> The fix is very straightforward.
> {code:java}
>   if (messageKey == null) {
>   node.set("key", null)
>   } else {
>   node.set("key", mapper.readValue(messageKey, 
> JsonNode.class));
>   }
> {code}
> If it is appreciated, I would send a pull request.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5517: [FLINK-8678] [flip6] Make RpcService shut down non...

2018-02-17 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5517

[FLINK-8678] [flip6] Make RpcService shut down non blocking

## What is the purpose of the change

Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
nonBlockingRpcServiceShutDown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5517


commit 5026fb052ba995f775ff3b6809f6cebdfd207140
Author: Till Rohrmann 
Date:   2018-02-16T17:46:42Z

[FLINK-8678] [flip6] Make RpcService shut down non blocking

Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.




---


[jira] [Commented] (FLINK-8678) Make AkkaRpcService#stopService non-blocking

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8678?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368200#comment-16368200
 ] 

ASF GitHub Bot commented on FLINK-8678:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/5517

[FLINK-8678] [flip6] Make RpcService shut down non blocking

## What is the purpose of the change

Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.

## Verifying this change

This change is already covered by existing tests.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
nonBlockingRpcServiceShutDown

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5517.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5517


commit 5026fb052ba995f775ff3b6809f6cebdfd207140
Author: Till Rohrmann 
Date:   2018-02-16T17:46:42Z

[FLINK-8678] [flip6] Make RpcService shut down non blocking

Changes the RpcService#stopService method to be non blocking. Instead
of waiting until the RpcService has stopped, it returns the termination
future which is completed once the RpcService has been completelyshut
down.




> Make AkkaRpcService#stopService non-blocking
> 
>
> Key: FLINK-8678
> URL: https://issues.apache.org/jira/browse/FLINK-8678
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination
>Affects Versions: 1.5.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> In order to properly shut down the {{AkkaRpcService}} in a non-blocking 
> fashion, we have to change the implementation of the 
> {{AkkaRpcService#stopService}}. This would give us the benefit to enable 
> non-blocking shut down of the components owning the {{AkkaRpcService}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-7477) Use "hadoop classpath" to augment classpath when available

2018-02-17 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7477?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368202#comment-16368202
 ] 

Aljoscha Krettek commented on FLINK-7477:
-

[~kkrugler] So on YARN, the current setup works for you or do you also have to 
remove the {{hadoop classpath}} parts from the scripts to make it work?

> Use "hadoop classpath" to augment classpath when available
> --
>
> Key: FLINK-7477
> URL: https://issues.apache.org/jira/browse/FLINK-7477
> Project: Flink
>  Issue Type: Bug
>  Components: Startup Shell Scripts
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
>Priority: Major
> Fix For: 1.4.0
>
>
> Currently, some cloud environments don't properly put the Hadoop jars into 
> {{HADOOP_CLASSPATH}} (or don't set {{HADOOP_CLASSPATH}}) at all. We should 
> check in {{config.sh}} if the {{hadoop}} binary is on the path and augment 
> our {{INTERNAL_HADOOP_CLASSPATHS}} with the result of {{hadoop classpath}} in 
> our scripts.
> This will improve the out-of-box experience of users that otherwise have to 
> manually set {{HADOOP_CLASSPATH}}.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-17 Thread Sihua Zhou (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sihua Zhou updated FLINK-8679:
--
Priority: Critical  (was: Major)

> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5490: [FLINK-8657][doc]Fix incorrect description in the documen...

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5490
  
@StefanRRichter If that is outdated, could you please merge this PR to fix 
it?


---


[jira] [Commented] (FLINK-8657) Fix incorrect description for external checkpoint vs savepoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8657?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368204#comment-16368204
 ] 

ASF GitHub Bot commented on FLINK-8657:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5490
  
@StefanRRichter If that is outdated, could you please merge this PR to fix 
it?


> Fix incorrect description for external checkpoint vs savepoint
> --
>
> Key: FLINK-8657
> URL: https://issues.apache.org/jira/browse/FLINK-8657
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Affects Versions: 1.4.0
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Minor
> Fix For: 1.5.0
>
>
> I checked that external checkpoint also supported rescale both in code and 
> practice. But in the doc it still note that "do not support Flink specific 
> features like rescaling." 
> I am afraid whether I have missed something, if so please just close this 
> issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...

2018-02-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168922479
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -1126,7 +1126,7 @@ public static void 
setJobManagerAddressInConfig(Configuration config, InetSocket
 * @return JobGraph extracted from the PackagedProgram
 * @throws ProgramInvocationException if the JobGraph generation failed
 */
-   private static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
+   public static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
--- End diff --

You're right, we should move it to a utility class.


---


[jira] [Commented] (FLINK-7715) Port JarRunHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368212#comment-16368212
 ] 

ASF GitHub Bot commented on FLINK-7715:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168922479
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
@@ -1126,7 +1126,7 @@ public static void 
setJobManagerAddressInConfig(Configuration config, InetSocket
 * @return JobGraph extracted from the PackagedProgram
 * @throws ProgramInvocationException if the JobGraph generation failed
 */
-   private static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
+   public static JobGraph createJobGraph(Configuration configuration, 
PackagedProgram packagedProgram, int defaultParallelism) throws 
ProgramInvocationException {
--- End diff --

You're right, we should move it to a utility class.


> Port JarRunHandler to new REST endpoint
> ---
>
> Key: FLINK-7715
> URL: https://issues.apache.org/jira/browse/FLINK-7715
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Gary Yao
>Priority: Major
>  Labels: flip-6
> Fix For: 1.5.0
>
>
> Port {{JarRunHandler}} to new REST endpoint.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...

2018-02-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168922506
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

I think we should move the `RestClusterClient` creation out of the handler 
into the `DispatcherEndpoint`. That way we can also enforce a proper shut down 
of it.


---


[jira] [Commented] (FLINK-7715) Port JarRunHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368213#comment-16368213
 ] 

ASF GitHub Bot commented on FLINK-7715:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168922506
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

I think we should move the `RestClusterClient` creation out of the handler 
into the `DispatcherEndpoint`. That way we can also enforce a proper shut down 
of it.


> Port JarRunHandler to new REST endpoint
> 

[GitHub] flink pull request #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBacken...

2018-02-17 Thread sihuazhou
GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5518

[FLINK-8679][State Backends]Fix RocksDBKeyedBackend.getKeys() bug for 
missing namespace condition.

This PR addressed issue 
[FLINK-8679](https://issues.apache.org/jira/browse/FLINK-8679). Currently, 
`RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It doesn't use the 
namespace to filter data. The lead to problematic when one key corresponding to 
different namespace. 

## Brief change log

- Modify RocksDBKeyedStateBackend.getKeys() to filter data according to 
namespace.

## Verifying this change

- This change can be verified by unit test in 
StateBackendTestBase.testGetKeys().

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink fix_rocksdb_getkeys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5518


commit ec861f44c7dd9fb136fdc682154992f115288f77
Author: sihuazhou 
Date:   2018-02-16T17:26:12Z

Fix getKeys() in RocksDBKeyStateBackend.

commit 7b0069421aa4e484aba2a97db2e4c6b3cd88f058
Author: sihuazhou 
Date:   2018-02-17T02:29:59Z

Fix loop bug in `getKeys()`.

commit d50cf63a7da42ebcf5e6ad08e3bbe28b462b1f7c
Author: sihuazhou 
Date:   2018-02-17T14:13:14Z

add test case for different namespace.




---


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368216#comment-16368216
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

GitHub user sihuazhou opened a pull request:

https://github.com/apache/flink/pull/5518

[FLINK-8679][State Backends]Fix RocksDBKeyedBackend.getKeys() bug for 
missing namespace condition.

This PR addressed issue 
[FLINK-8679](https://issues.apache.org/jira/browse/FLINK-8679). Currently, 
`RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It doesn't use the 
namespace to filter data. The lead to problematic when one key corresponding to 
different namespace. 

## Brief change log

- Modify RocksDBKeyedStateBackend.getKeys() to filter data according to 
namespace.

## Verifying this change

- This change can be verified by unit test in 
StateBackendTestBase.testGetKeys().

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/sihuazhou/flink fix_rocksdb_getkeys

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5518.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5518


commit ec861f44c7dd9fb136fdc682154992f115288f77
Author: sihuazhou 
Date:   2018-02-16T17:26:12Z

Fix getKeys() in RocksDBKeyStateBackend.

commit 7b0069421aa4e484aba2a97db2e4c6b3cd88f058
Author: sihuazhou 
Date:   2018-02-17T02:29:59Z

Fix loop bug in `getKeys()`.

commit d50cf63a7da42ebcf5e6ad08e3bbe28b462b1f7c
Author: sihuazhou 
Date:   2018-02-17T14:13:14Z

add test case for different namespace.




> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-8679) RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with namespace

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8679?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368217#comment-16368217
 ] 

ASF GitHub Bot commented on FLINK-8679:
---

Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5518
  
@pnowojski Could you please have a look at this?


> RocksDBKeyedBackend.getKeys(stateName, namespace) doesn't filter data with 
> namespace
> 
>
> Key: FLINK-8679
> URL: https://issues.apache.org/jira/browse/FLINK-8679
> Project: Flink
>  Issue Type: Bug
>  Components: State Backends, Checkpointing
>Reporter: Sihua Zhou
>Assignee: Sihua Zhou
>Priority: Critical
>
> Currently, `RocksDBKeyedBackend.getKeys(stateName, namespace)` is odds. It 
> doesn't use the namespace to filter data. And 
> `HeapKeyedBackend.getKeys(stateName, namespace)` has done that, I think they 
> should be consistent at least.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5518: [FLINK-8679][State Backends]Fix RocksDBKeyedBackend.getKe...

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on the issue:

https://github.com/apache/flink/pull/5518
  
@pnowojski Could you please have a look at this?


---


[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923188
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
- 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923464
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
- 

[GitHub] flink pull request #5239: [FLINK-8360] Implement task-local state recovery

2018-02-17 Thread sihuazhou
Github user sihuazhou commented on a diff in the pull request:

https://github.com/apache/flink/pull/5239#discussion_r168923652
  
--- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 ---
@@ -336,1697 +386,1982 @@ public int getKeyGroupPrefixBytes() {
 * @param streamFactory The factory that we can use for writing our 
state to streams.
 * @param checkpointOptions Options for how to perform this checkpoint.
 * @return Future to the state handle of the snapshot data.
-* @throws Exception
+* @throws Exception indicating a problem in the synchronous part of 
the checkpoint.
 */
@Override
-   public RunnableFuture snapshot(
+   public RunnableFuture> snapshot(
final long checkpointId,
final long timestamp,
final CheckpointStreamFactory streamFactory,
CheckpointOptions checkpointOptions) throws Exception {
 
-   if (checkpointOptions.getCheckpointType() != 
CheckpointType.SAVEPOINT &&
-   enableIncrementalCheckpointing) {
-   return snapshotIncrementally(checkpointId, timestamp, 
streamFactory);
-   } else {
-   return snapshotFully(checkpointId, timestamp, 
streamFactory);
-   }
+   return snapshotStrategy.performSnapshot(checkpointId, 
timestamp, streamFactory, checkpointOptions);
}
 
-   private RunnableFuture snapshotIncrementally(
-   final long checkpointId,
-   final long checkpointTimestamp,
-   final CheckpointStreamFactory checkpointStreamFactory) throws 
Exception {
-
-   if (db == null) {
-   throw new IOException("RocksDB closed.");
-   }
+   @Override
+   public void restore(StateObjectCollection 
restoreState) throws Exception {
+   LOG.info("Initializing RocksDB keyed state backend from 
snapshot.");
 
-   if (kvStateInformation.isEmpty()) {
-   if (LOG.isDebugEnabled()) {
-   LOG.debug("Asynchronous RocksDB snapshot 
performed on empty keyed state at {}. Returning null.",
-   checkpointTimestamp);
-   }
-   return DoneFuture.nullValue();
+   if (LOG.isDebugEnabled()) {
+   LOG.debug("Restoring snapshot from state handles: {}.", 
restoreState);
}
 
-   final RocksDBIncrementalSnapshotOperation snapshotOperation =
-   new RocksDBIncrementalSnapshotOperation<>(
-   this,
-   checkpointStreamFactory,
-   checkpointId,
-   checkpointTimestamp);
+   // clear all meta data
+   kvStateInformation.clear();
+   restoredKvStateMetaInfos.clear();
 
try {
-   snapshotOperation.takeSnapshot();
-   } catch (Exception e) {
-   snapshotOperation.stop();
-   snapshotOperation.releaseResources(true);
-   throw e;
-   }
-
-   return new FutureTask(
-   new Callable() {
-   @Override
-   public KeyedStateHandle call() throws Exception 
{
-   return 
snapshotOperation.materializeSnapshot();
+   if (restoreState == null || restoreState.isEmpty()) {
+   createDB();
+   } else {
+   KeyedStateHandle firstStateHandle = 
restoreState.iterator().next();
+   if (firstStateHandle instanceof 
IncrementalKeyedStateHandle
+   || firstStateHandle instanceof 
IncrementalLocalKeyedStateHandle) {
+   RocksDBIncrementalRestoreOperation 
restoreOperation = new RocksDBIncrementalRestoreOperation<>(this);
+   restoreOperation.restore(restoreState);
+   } else {
+   RocksDBFullRestoreOperation 
restoreOperation = new RocksDBFullRestoreOperation<>(this);
+   
restoreOperation.doRestore(restoreState);
}
}
-   ) {
-   @Override
-   public boolean cancel(boolean mayInterruptIfRunning) {
-   snapshotOperation.stop();
-   return super.cancel(mayInterruptIfRunning);
- 

[GitHub] flink issue #5333: [FLINK-5886] Python API for streaming applications

2018-02-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
merging.


---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368232#comment-16368232
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/5333
  
merging.


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Closed] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-5886.
---
   Resolution: Fixed
Fix Version/s: 1.5.0

master:
00284fb8131ecb064a08a2da010d27ae95806744
9b7e429246d149d02a899071b71e7ee9d321b9f4

> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
> Fix For: 1.5.0
>
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #3838: [FLINK-5886] Python API for streaming applications

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3838


---


[GitHub] flink pull request #5333: [FLINK-5886] Python API for streaming applications

2018-02-17 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5333


---


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368278#comment-16368278
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/5333


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
> Fix For: 1.5.0
>
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (FLINK-5886) Python API for streaming applications

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-5886?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368277#comment-16368277
 ] 

ASF GitHub Bot commented on FLINK-5886:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3838


> Python API for streaming applications
> -
>
> Key: FLINK-5886
> URL: https://issues.apache.org/jira/browse/FLINK-5886
> Project: Flink
>  Issue Type: New Feature
>  Components: Python API
>Reporter: Zohar Mizrahi
>Assignee: Zohar Mizrahi
>Priority: Major
> Fix For: 1.5.0
>
>
> A work in progress to provide python interface for Flink streaming APIs. The 
> core technology is based on jython and thus imposes two limitations: a. user 
> defined functions cannot use python extensions. b. the python version is 2.x
> The branch is based on Flink release 1.2.0, as can be found here:
> https://github.com/zohar-pm/flink/tree/python-streaming
> In order to test it, someone can use IntelliJ IDE. Assuming IntelliJ was 
> setup properly (see: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/internals/ide_setup.html),
>  one can run/debug {{org.apache.flink.python.api.PythonStreamBinderTest}}, 
> which in return will execute all the tests under 
> {{/Users/zohar/dev/pm-flink/flink-libraries/flink-python/src/test/python/org/apache/flink/python/api/streaming}}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (FLINK-8687) MaterializedCollectStreamResult#retrievePage should take resultLock

2018-02-17 Thread Ted Yu (JIRA)
Ted Yu created FLINK-8687:
-

 Summary: MaterializedCollectStreamResult#retrievePage should take 
resultLock
 Key: FLINK-8687
 URL: https://issues.apache.org/jira/browse/FLINK-8687
 Project: Flink
  Issue Type: Bug
Reporter: Ted Yu


Currently MaterializedCollectStreamResult#retrievePage checks page range and 
calls snapshot.subList() without holding resultLock.

resultLock should be taken.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...

2018-02-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168929157
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

Hmm I just realized that this is not so easy since we would need 
`flink-clients` as a dependency where the `JarRunHandler` is initialized...


---


[jira] [Commented] (FLINK-7715) Port JarRunHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368310#comment-16368310
 ] 

ASF GitHub Bot commented on FLINK-7715:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168929157
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

Hmm I just realized that this is not so easy since we would need 
`flink-clients` as a dependency where the `JarRunHandler` is initialized...


> Port JarRunHandler to new REST endpoint
> ---
>
>  

[GitHub] flink pull request #5509: [FLINK-7715][flip6] Port JarRunHandler to new REST...

2018-02-17 Thread tillrohrmann
Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168930002
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

What do you think about a `DispatcherRestEndpoint` subclass which lives in 
`flink-runtime-web` and adds the web submission handler? This class would then 
be instantiated via the `WebMonitorUtils`.


---


[jira] [Commented] (FLINK-7715) Port JarRunHandler to new REST endpoint

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7715?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368326#comment-16368326
 ] 

ASF GitHub Bot commented on FLINK-7715:
---

Github user tillrohrmann commented on a diff in the pull request:

https://github.com/apache/flink/pull/5509#discussion_r168930002
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.client.cli.CliFrontend;
+import org.apache.flink.client.program.PackagedProgram;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.client.program.rest.RestClusterClient;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.rest.handler.AbstractRestHandler;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.RestHandlerException;
+import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
+import org.apache.flink.runtime.rest.messages.MessageHeaders;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+
+import 
org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.Executor;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import static java.util.Objects.requireNonNull;
+import static 
org.apache.flink.shaded.guava18.com.google.common.base.Strings.emptyToNull;
+
+/**
+ * Handler to submit jobs uploaded via the Web UI.
+ */
+public class JarRunHandler extends
+   AbstractRestHandler {
+
+   private static final Pattern ARGUMENTS_TOKENIZE_PATTERN = 
Pattern.compile("([^\"\']\\S*|\".+?\"|\'.+?\')\\s*");
+
+   private final Path jarDir;
+
+   private final Configuration configuration;
+
+   private final Executor executor;
+
+   private final RestClusterClient restClusterClient;
+
+   public JarRunHandler(
+   final CompletableFuture localRestAddress,
+   final GatewayRetriever 
leaderRetriever,
+   final Time timeout,
+   final Map responseHeaders,
+   final MessageHeaders messageHeaders,
+   final Path jarDir,
+   final Configuration configuration,
+   final Executor executor) {
+   super(localRestAddress, leaderRetriever, timeout, 
responseHeaders, messageHeaders);
+
+   this.jarDir = requireNonNull(jarDir);
+   this.configuration = requireNonNull(configuration);
+   this.executor = requireNonNull(executor);
+   try {
+   this.restClusterClient = new 
RestClusterClient<>(configuration, "");
--- End diff --

What do you think about a `DispatcherRestEndpoint` subclass which lives in 
`flink-runtime-web` and adds the web submission handler? This class would then 
be instantiated via the `WebMonitorUtils`.


> Port JarRunHandler to new REST end

[GitHub] flink pull request #5514: [FLINK-8599] Improve the failure behavior of the C...

2018-02-17 Thread ChengzhiZhao
Github user ChengzhiZhao closed the pull request at:

https://github.com/apache/flink/pull/5514


---


[jira] [Commented] (FLINK-8599) Improve the failure behavior of the ContinuousFileReaderOperator for bad files

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8599?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368348#comment-16368348
 ] 

ASF GitHub Bot commented on FLINK-8599:
---

Github user ChengzhiZhao closed the pull request at:

https://github.com/apache/flink/pull/5514


> Improve the failure behavior of the ContinuousFileReaderOperator for bad files
> --
>
> Key: FLINK-8599
> URL: https://issues.apache.org/jira/browse/FLINK-8599
> Project: Flink
>  Issue Type: New Feature
>  Components: DataStream API
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Chengzhi Zhao
>Priority: Major
>
> So we have a s3 path that flink is monitoring that path to see new files 
> available.
> {code:java}
> val avroInputStream_activity = env.readFile(format, path, 
> FileProcessingMode.PROCESS_CONTINUOUSLY, 1)  {code}
>  
> I am doing both internal and external check pointing and let's say there is a 
> bad file (for example, a different schema been dropped in this folder) came 
> to the path and flink will do several retries. I want to take those bad files 
> and let the process continue. However, since the file path persist in the 
> checkpoint, when I try to resume from external checkpoint, it threw the 
> following error on no file been found.
>  
> {code:java}
> java.io.IOException: Error opening the Input Split s3a://myfile [0,904]: No 
> such file or directory: s3a://myfile{code}
>  
> As [~fhue...@gmail.com] suggested, we could check if a path exists and before 
> trying to read a file and ignore the input split instead of throwing an 
> exception and causing a failure.
>  
> Also, I am thinking about add an error output for bad files as an option to 
> users. So if there is any bad files exist we could move them in a separated 
> path and do further analysis. 
>  
> Not sure how people feel about it, but I'd like to contribute on it if people 
> think this can be an improvement. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink pull request #5519: [FLINK-8684] [config] Rework MesosTaskManagerParam...

2018-02-17 Thread zhangminglei
GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5519

[FLINK-8684] [config] Rework MesosTaskManagerParameters#MESOS_RM_TASK…

## What is the purpose of the change
Directly by using ```TaskManagerOptions.NUM_TASK_SLOTS``` as it's alias to 
rework the ```ConfigOption```. to avoid creating 2 documentation entries of  
```taskmanager.numberOfTaskSlots```.

## Brief change log
Use FLINK-8683 test this change.


## Verifying this change
This change is a trivial rework. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no)
  - The runtime per-record code paths (performance sensitive): (/ no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (/ no )
  - The S3 file system connector: ( no )

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8684

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5519


commit 2ea03310b29d69f56c637a2c367dad38c892c3ff
Author: zhangminglei 
Date:   2018-02-18T03:01:39Z

[FLINK-8684] [config] Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS




---


[jira] [Commented] (FLINK-8684) Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368424#comment-16368424
 ] 

ASF GitHub Bot commented on FLINK-8684:
---

GitHub user zhangminglei opened a pull request:

https://github.com/apache/flink/pull/5519

[FLINK-8684] [config] Rework MesosTaskManagerParameters#MESOS_RM_TASK…

## What is the purpose of the change
Directly by using ```TaskManagerOptions.NUM_TASK_SLOTS``` as it's alias to 
rework the ```ConfigOption```. to avoid creating 2 documentation entries of  
```taskmanager.numberOfTaskSlots```.

## Brief change log
Use FLINK-8683 test this change.


## Verifying this change
This change is a trivial rework. 

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: ( no)
  - The runtime per-record code paths (performance sensitive): (/ no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (/ no )
  - The S3 file system connector: ( no )

## Documentation

  - Does this pull request introduce a new feature? ( no)
  - If yes, how is the feature documented? ( not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhangminglei/flink flink-8684

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/5519.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #5519


commit 2ea03310b29d69f56c637a2c367dad38c892c3ff
Author: zhangminglei 
Date:   2018-02-18T03:01:39Z

[FLINK-8684] [config] Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS




> Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS
> --
>
> Key: FLINK-8684
> URL: https://issues.apache.org/jira/browse/FLINK-8684
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Mesos
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, {{MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS}} mimics 
> {{TaskManagerOptions#NUM_TASK_SLOTS}}:
> {code:java}
> public static final ConfigOption MESOS_RM_TASKS_SLOTS =
>   key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
>   .defaultValue(1);
> public static final ConfigOption NUM_TASK_SLOTS =
>   key("taskmanager.numberOfTaskSlots")
>   .defaultValue(1)
>   .withDescription("...");
> {code}
> This pattern is problematic as this creates 2 documentation entries for 
> {{taskmanager.numberOfTaskSlots}} with different descriptions, and opens the 
> potential for different defaults. Ultimately this causes the documentation to 
> become ambiguous.
> I thus propose to either outright remove this option or turn it into an 
> actual alias:
> {code:java}
> public static final ConfigOption MESOS_RM_TASKS_SLOTS =
> TaskManagerOptions.NUM_TASK_SLOTS;
> {code}
> As a side-effect of FLINK-8683 we can ensure that no differing config options 
> exist for a given key.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[GitHub] flink issue #5519: [FLINK-8684] [config] Rework MesosTaskManagerParameters#M...

2018-02-17 Thread zhangminglei
Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5519
  
I'm not very sure whether this PR is necessarily needed, as this already 
seems to be fixed at FLINK-8683


---


[jira] [Commented] (FLINK-8684) Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS

2018-02-17 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-8684?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16368425#comment-16368425
 ] 

ASF GitHub Bot commented on FLINK-8684:
---

Github user zhangminglei commented on the issue:

https://github.com/apache/flink/pull/5519
  
I'm not very sure whether this PR is necessarily needed, as this already 
seems to be fixed at FLINK-8683


> Rework MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS
> --
>
> Key: FLINK-8684
> URL: https://issues.apache.org/jira/browse/FLINK-8684
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, Mesos
>Affects Versions: 1.5.0
>Reporter: Chesnay Schepler
>Priority: Major
> Fix For: 1.5.0
>
>
> Currently, {{MesosTaskManagerParameters#MESOS_RM_TASKS_SLOTS}} mimics 
> {{TaskManagerOptions#NUM_TASK_SLOTS}}:
> {code:java}
> public static final ConfigOption MESOS_RM_TASKS_SLOTS =
>   key(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS)
>   .defaultValue(1);
> public static final ConfigOption NUM_TASK_SLOTS =
>   key("taskmanager.numberOfTaskSlots")
>   .defaultValue(1)
>   .withDescription("...");
> {code}
> This pattern is problematic as this creates 2 documentation entries for 
> {{taskmanager.numberOfTaskSlots}} with different descriptions, and opens the 
> potential for different defaults. Ultimately this causes the documentation to 
> become ambiguous.
> I thus propose to either outright remove this option or turn it into an 
> actual alias:
> {code:java}
> public static final ConfigOption MESOS_RM_TASKS_SLOTS =
> TaskManagerOptions.NUM_TASK_SLOTS;
> {code}
> As a side-effect of FLINK-8683 we can ensure that no differing config options 
> exist for a given key.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)