[jira] [Comment Edited] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200786#comment-16200786
 ] 

Hai Zhou UTC+8 edited comment on FLINK-7816 at 10/12/17 5:39 AM:
-

Hi [~aljoscha], 
In *ClosureCleaner.clean(func, checkSerializable)* method,  Should we first 
check the *func.getClass* is  a closure?

I create a tickect 
[FLINK-7819|https://issues.apache.org/jira/browse/FLINK-7819] to fix it.


was (Author: yew1eb):
Hi [~aljoscha], 
In *ClosureCleaner.clean(func, checkSerializable)* method,  Should we first 
check the *func.getClass* is  a closure?



> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> 
>
> Key: FLINK-7816
> URL: https://issues.apache.org/jira/browse/FLINK-7816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>
> We have the same problem as Spark: SPARK-14540



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7819) Check object to clean is closure

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)
Hai Zhou UTC+8 created FLINK-7819:
-

 Summary: Check object to clean is closure
 Key: FLINK-7819
 URL: https://issues.apache.org/jira/browse/FLINK-7819
 Project: Flink
  Issue Type: Bug
  Components: Build System
Affects Versions: 1.3.2
Reporter: Hai Zhou UTC+8
Assignee: Hai Zhou UTC+8
 Fix For: 1.4.0


in *ClosureCleaner.clean(func) *  method, we should check func is closure.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7475) ListState support update

2017-10-11 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li reassigned FLINK-7475:
---

Assignee: Bowen Li

> ListState support update
> 
>
> Key: FLINK-7475
> URL: https://issues.apache.org/jira/browse/FLINK-7475
> Project: Flink
>  Issue Type: Improvement
>  Components: Core, DataStream API
>Reporter: yf
>Assignee: Bowen Li
>
> If I want to update the list. 
> I have to do two steps: 
> listState.clear() 
> for (Element e : myList) { 
> listState.add(e); 
> } 
> Why not I update the state by: 
> listState.update(myList) ?



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7671) testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build

2017-10-11 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7671:

Description: 
{code:java}
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase
testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase)
  Time elapsed: 0.27 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: expected interface 
org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, 
found class 
org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure
at scala.Predef$.assert(Predef.scala:179)
at 
akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424)
at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224)
at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232)
at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108)
{code}

in https://travis-ci.org/apache/flink/jobs/278740892


  was:

{code:java}
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase
testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase)
  Time elapsed: 0.27 sec  <<< FAILURE!
java.lang.AssertionError: assertion failed: expected interface 
org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, 
found class 
org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure
at scala.Predef$.assert(Predef.scala:179)
at 
akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424)
at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224)
at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232)
at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
at akka.testkit.TestKit.within(TestKit.scala:718)
at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133)
at 
org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108)
{code}

in https://travis-ci.org/apache/flink/jobs/278740892


Component/s: JobManager

> testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build
> 
>
> Key: FLINK-7671
> URL: https://issues.apache.org/jira/browse/FLINK-7671
> Project: Flink
>  Issue Type: Bug
>  Components: JobManager, Tests
>Affects Versions: 1.4.0
>Reporter: Bowen Li
> Fix For: 1.4.0
>
>
> {code:java}
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase
> testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase)
>   Time elapsed: 0.27 sec  <<< FAILURE!
> java.lang.AssertionError: assertion failed: expected interface 
> org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, 
> found class 
> org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424)
>   at 

[jira] [Updated] (FLINK-7671) testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build

2017-10-11 Thread Bowen Li (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bowen Li updated FLINK-7671:

Summary: testBlobServerCleanupCancelledJob in JobManagerCleanupITCase 
failed in build  (was: JobManagerCleanupITCase failed in build)

> testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build
> 
>
> Key: FLINK-7671
> URL: https://issues.apache.org/jira/browse/FLINK-7671
> Project: Flink
>  Issue Type: Bug
>  Components: Tests
>Affects Versions: 1.4.0
>Reporter: Bowen Li
> Fix For: 1.4.0
>
>
> {code:java}
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase
> testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase)
>   Time elapsed: 0.27 sec  <<< FAILURE!
> java.lang.AssertionError: assertion failed: expected interface 
> org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, 
> found class 
> org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure
>   at scala.Predef$.assert(Predef.scala:179)
>   at 
> akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424)
>   at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410)
>   at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718)
>   at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224)
>   at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232)
>   at akka.testkit.TestKitBase$class.within(TestKit.scala:296)
>   at akka.testkit.TestKit.within(TestKit.scala:718)
>   at akka.testkit.TestKitBase$class.within(TestKit.scala:310)
>   at akka.testkit.TestKit.within(TestKit.scala:718)
>   at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133)
>   at 
> org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108)
> {code}
> in https://travis-ci.org/apache/flink/jobs/278740892



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201386#comment-16201386
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144186213
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null) {
+   return;
+   }
+
+   if (channel.isWritable()) {
--- End diff --

I guess you suggest to separate this PR into some smaller ones?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144186213
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null) {
+   return;
+   }
+
+   if (channel.isWritable()) {
--- End diff --

I guess you suggest to separate this PR into some smaller ones?


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201368#comment-16201368
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
`notifyCreditAvailable` would be called by three logics in 
`RemoteInputChannel`. They are `RemoteInputChannel#recycle`, 
`RemoteInputChannel#notifyBufferAvailable` and 
`RemoteInputChannel#onSenderBacklog` which are covered in previous PRs. 
All the previous PRs are already merged into master except 
[FLINK-7406](https://github.com/apache/flink/pull/4509).

In details, when the channel's credit is increased from zero, it will try 
to notify the producer of it. For example:
1. Recycle the exclusive buffers to increase credit after record processed.
2. The buffer pool notifies the channel of available floating buffers to 
increase credit
3. When receiving the backlog from producer, it may trigger to request 
floating buffers from buffer pool. Credit is increased when returned floating 
buffers from buffer pool.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-10-11 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
`notifyCreditAvailable` would be called by three logics in 
`RemoteInputChannel`. They are `RemoteInputChannel#recycle`, 
`RemoteInputChannel#notifyBufferAvailable` and 
`RemoteInputChannel#onSenderBacklog` which are covered in previous PRs. 
All the previous PRs are already merged into master except 
[FLINK-7406](https://github.com/apache/flink/pull/4509).

In details, when the channel's credit is increased from zero, it will try 
to notify the producer of it. For example:
1. Recycle the exclusive buffers to increase credit after record processed.
2. The buffer pool notifies the channel of available floating buffers to 
increase credit
3. When receiving the backlog from producer, it may trigger to request 
floating buffers from buffer pool. Credit is increased when returned floating 
buffers from buffer pool.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201353#comment-16201353
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144181936
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) {
}
}
 
+   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
+   // Implement in CreditBasedClientHandler
--- End diff --

The `PartitionRequestClientHandler` will keep the current network logic, 
and all the credit-based logics are implemented in new 
`CreditBasedClientHandler`. 
Finally the `PartitionRequestClientHandler` will be replaced by 
`CreditBasedClientHandler`.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread zhijiangW
Github user zhijiangW commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144181936
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) {
}
}
 
+   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
+   // Implement in CreditBasedClientHandler
--- End diff --

The `PartitionRequestClientHandler` will keep the current network logic, 
and all the credit-based logics are implemented in new 
`CreditBasedClientHandler`. 
Finally the `PartitionRequestClientHandler` will be replaced by 
`CreditBasedClientHandler`.


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201350#comment-16201350
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@pnowojski , thanks for your reviews!

I should explain the context beforehand. 

We present the separate  `CreditBasedClientHandler` in order not to affect 
the current logic in master branch when partial PRs merged. Actually it would 
replace the current `PartitionRequestClientHandler` after all this feature 
codes merged. 

Different from previous 
[FLINK-7406](https://github.com/apache/flink/pull/4509) which is related with 
ingoing pipeline logic, and this PR is for outgoing pipeline logic.


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...

2017-10-11 Thread zhijiangW
Github user zhijiangW commented on the issue:

https://github.com/apache/flink/pull/4533
  
@pnowojski , thanks for your reviews!

I should explain the context beforehand. 

We present the separate  `CreditBasedClientHandler` in order not to affect 
the current logic in master branch when partial PRs merged. Actually it would 
replace the current `PartitionRequestClientHandler` after all this feature 
codes merged. 

Different from previous 
[FLINK-7406](https://github.com/apache/flink/pull/4509) which is related with 
ingoing pipeline logic, and this PR is for outgoing pipeline logic.


---


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201263#comment-16201263
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4807
  
wonderful! +1


> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4.x

2017-10-11 Thread bowenli86
Github user bowenli86 commented on the issue:

https://github.com/apache/flink/pull/4807
  
wonderful! +1


---


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-10-11 Thread Bowen Li (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201258#comment-16201258
 ] 

Bowen Li commented on FLINK-6951:
-

[~aljoscha] I believe so as I've seen a couple email threads complaining about 
the same problem. What do you think, Gordon? [~tzulitai]

On my side, I have made a hacky workaround so this problem hasn't blocked me.

I'm traveling now. I can re-take a closer look at it in about three weeks.

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201187#comment-16201187
 ] 

ASF GitHub Bot commented on FLINK-7540:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4812

[FLINK-7540] Apply consistent hostname normalization

## What is the purpose of the change

The hostname normalization is now applied when generating the remote akka 
config.
That way it should be ensured that all ActorSystems are bound to a 
normalized
hostname.

## Brief change log

- Add hostname normalization to `AkkaUtils#getAkkaConfig`
- Replace manual ActorSystem instantiation with 
`BootstrapTools#startActorSystem`

## Verifying this change

- Added `AkkaUtilsTest#getAkkaConfig`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) It affects how 
`ActorSystem` are instantiated.

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixHostnameNormalization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4812


commit 00876ead7a4a7492d643f6cba3e784044c54669e
Author: Till Rohrmann 
Date:   2017-10-11T23:17:23Z

[FLINK-7540] Apply consistent hostname normalization

The hostname normalization is now applied when generationg the remote akka 
config.
That way it should be ensured that all ActorSystems are bound to a 
normalized
hostname.




> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager

[GitHub] flink pull request #4812: [FLINK-7540] Apply consistent hostname normalizati...

2017-10-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4812

[FLINK-7540] Apply consistent hostname normalization

## What is the purpose of the change

The hostname normalization is now applied when generating the remote akka 
config.
That way it should be ensured that all ActorSystems are bound to a 
normalized
hostname.

## Brief change log

- Add hostname normalization to `AkkaUtils#getAkkaConfig`
- Replace manual ActorSystem instantiation with 
`BootstrapTools#startActorSystem`

## Verifying this change

- Added `AkkaUtilsTest#getAkkaConfig`

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) It affects how 
`ActorSystem` are instantiated.

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixHostnameNormalization

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4812.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4812


commit 00876ead7a4a7492d643f6cba3e784044c54669e
Author: Till Rohrmann 
Date:   2017-10-11T23:17:23Z

[FLINK-7540] Apply consistent hostname normalization

The hostname normalization is now applied when generationg the remote akka 
config.
That way it should be ensured that all ActorSystems are bound to a 
normalized
hostname.




---


[jira] [Created] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7818:


 Summary: Synchronize MetricStore access in the TaskManagersHandler
 Key: FLINK-7818
 URL: https://issues.apache.org/jira/browse/FLINK-7818
 Project: Flink
  Issue Type: Bug
  Components: Metrics, REST
Affects Versions: 1.3.2, 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann


The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
single {{TaskManager}} are requested. The access is not synchronized which can 
be problematic because the {{MetricStore}} is not thread safe.

I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201103#comment-16201103
 ] 

ASF GitHub Bot commented on FLINK-7806:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
Not sure why the changed @zentol. I just executed the build process as it 
was described in the readme.


> Move CurrentJobsOverviewHandler to jobs/overview
> 
>
> Key: FLINK-7806
> URL: https://issues.apache.org/jira/browse/FLINK-7806
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{CurrentJobsOverviewHandler}} is currently registered under 
> {{/joboverview}}. I think it would be more idiomatic to register it under 
> {{/jobs/overview}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler ...

2017-10-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4805
  
Not sure why the changed @zentol. I just executed the build process as it 
was described in the readme.


---


[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201096#comment-16201096
 ] 

Till Rohrmann commented on FLINK-7540:
--

This is indeed a big problem for setups where you have lower and upper case 
hostnames and if you use IPv6 addresses. The underlying problem is as Aljoscha 
pointed out that we don't apply the hostname normalisation consistently. For 
example, the {{StandaloneHaServices}} assume that hostname are normalized. 
However, this is not true in the Yarn, Mesos and Flip-6 case. For the HA mode 
this is not a problem since we distribute the hostnames via ZooKeeper.

All the affected cases have in common that they start their {{ActorSystem}} via 
the {{BootstrapTools}}. Adding the normalization to 
{{AkkaUtils#getAkkaConfig(Configuration, Option[(String, Int)])}} should solve 
the problem because all remote actor systems get their hostname configuration 
via this method.

> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Priority: Critical
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is 

[GitHub] flink pull request #4811: [FLINK-7818] Synchronize MetricStore access in Tas...

2017-10-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4811

[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler

## What is the purpose of the change

Synchronize MetricStore access in TaskManagersHandler

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskManagersHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4811


commit 4291b5aa1b3687ee7ba11876ca35f91e33ad3360
Author: Till Rohrmann 
Date:   2017-10-11T17:00:16Z

[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler




---


[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201119#comment-16201119
 ] 

ASF GitHub Bot commented on FLINK-7818:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4811

[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler

## What is the purpose of the change

Synchronize MetricStore access in TaskManagersHandler

## Verifying this change

This change is a trivial rework / code cleanup without any test coverage.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink fixTaskManagersHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4811.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4811


commit 4291b5aa1b3687ee7ba11876ca35f91e33ad3360
Author: Till Rohrmann 
Date:   2017-10-11T17:00:16Z

[FLINK-7818] Synchronize MetricStore access in TaskManagersHandler




> Synchronize MetricStore access in the TaskManagersHandler
> -
>
> Key: FLINK-7818
> URL: https://issues.apache.org/jira/browse/FLINK-7818
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, REST
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
> single {{TaskManager}} are requested. The access is not synchronized which 
> can be problematic because the {{MetricStore}} is not thread safe.
> I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7540:


Assignee: Till Rohrmann

> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Assignee: Till Rohrmann
>Priority: Critical
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  
> } else {  host = host.trim().toLowerCase();   
> }
> ...
> }
> It turns the host name into lowercase.
> Therefore, JobManagerRetriever certainly can not find Jobmanager's 
> actorSYstem.
> Then I removed the call to the toLowerCase() method in the source code.
> Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler

2017-10-11 Thread Till Rohrmann (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1620#comment-1620
 ] 

Till Rohrmann commented on FLINK-7818:
--

Once the {{MetricStore}} is thread safe, we can remove the synchronization.

> Synchronize MetricStore access in the TaskManagersHandler
> -
>
> Key: FLINK-7818
> URL: https://issues.apache.org/jira/browse/FLINK-7818
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics, REST
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>
> The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a 
> single {{TaskManager}} are requested. The access is not synchronized which 
> can be problematic because the {{MetricStore}} is not thread safe.
> I propose to add synchronization.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann updated FLINK-7540:
-
Priority: Critical  (was: Blocker)

> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Priority: Critical
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  
> } else {  host = host.trim().toLowerCase();   
> }
> ...
> }
> It turns the host name into lowercase.
> Therefore, JobManagerRetriever certainly can not find Jobmanager's 
> actorSYstem.
> Then I removed the call to the toLowerCase() method in the source code.
> Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4804: [hotfix] [kafka] Fix the config parameter names in KafkaT...

2017-10-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4804
  
thanks for catching it, merging.


---


[jira] [Commented] (FLINK-6703) Document how to take a savepoint on YARN

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201018#comment-16201018
 ] 

ASF GitHub Bot commented on FLINK-6703:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4721
  
merging.


> Document how to take a savepoint on YARN
> 
>
> Key: FLINK-6703
> URL: https://issues.apache.org/jira/browse/FLINK-6703
> Project: Flink
>  Issue Type: Improvement
>  Components: Documentation, YARN
>Affects Versions: 1.3.0, 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Bowen Li
>
> The documentation should have a separate entry for savepoint related CLI 
> commands in combination with YARN. It is currently not documented that you 
> have to supply the application id, nor how you can pass it.
> {code}
> ./bin/flink savepoint  -m yarn-cluster (-yid|-yarnapplicationId) 
> 
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4721: [FLINK-6703][savepoint/doc] Document how to take a savepo...

2017-10-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4721
  
merging.


---


[jira] [Commented] (FLINK-7774) Deserializers are not cleaned up when closing input streams

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201015#comment-16201015
 ] 

ASF GitHub Bot commented on FLINK-7774:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4783
  
merging.


> Deserializers are not cleaned up when closing input streams
> ---
>
> Key: FLINK-7774
> URL: https://issues.apache.org/jira/browse/FLINK-7774
> Project: Flink
>  Issue Type: Bug
>  Components: Network
>Affects Versions: 1.4.0, 1.3.2
>Reporter: Nico Kruber
>Assignee: Nico Kruber
>
> On cleanup of the {{AbstractRecordReader}}, {{StreamInputProcessor}}, and 
> {{StreamTwoInputProcessor}}, the deserializers' current buffers are cleaned 
> up but not their internal {{spanningWrapper}} and {{nonSpanningWrapper}} via 
> {{RecordDeserializer#clear}}. This call should be added.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7807) HandlerUtils methods should log errors

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201007#comment-16201007
 ] 

ASF GitHub Bot commented on FLINK-7807:
---

Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4799


> HandlerUtils methods should log errors
> --
>
> Key: FLINK-7807
> URL: https://issues.apache.org/jira/browse/FLINK-7807
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The {{HandlerUtils}} methods for sending (error) responses send sanitized 
> responses in case of exceptions, but don't log them in any way making 
> debugging impossible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4783: [FLINK-7774][network] fix not clearing deserializers on c...

2017-10-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4783
  
merging.


---


[GitHub] flink pull request #4799: [FLINK-7807] [REST] Log exceptions in HandlerUtils...

2017-10-11 Thread zentol
Github user zentol closed the pull request at:

https://github.com/apache/flink/pull/4799


---


[jira] [Commented] (FLINK-7808) JobDetails constructor should check length of tasksPerState array

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201002#comment-16201002
 ] 

ASF GitHub Bot commented on FLINK-7808:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4800


> JobDetails constructor should check length of tasksPerState array
> -
>
> Key: FLINK-7808
> URL: https://issues.apache.org/jira/browse/FLINK-7808
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>
> The JobDetails constructor accepts an {{int[] tasksPerState}} argument, which 
> represents the number of tasks in each {{ExecutionState}}. There is no check 
> in place to verify that the array has the correct size, which the json 
> serializer assumes to be the case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201003#comment-16201003
 ] 

ASF GitHub Bot commented on FLINK-7661:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4698


> Add credit field in PartitionRequest message
> 
>
> Key: FLINK-7661
> URL: https://issues.apache.org/jira/browse/FLINK-7661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | 
> {{queueIndex}} | {{InputChannelID}} fields.
> We will add a new {{credit}} field indicating the initial credit of 
> {{InputChannel}}, and this info can be got from {{InputChannel}} directly 
> after assigning exclusive buffers to it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7661) Add credit field in PartitionRequest message

2017-10-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7661.
---
   Resolution: Fixed
Fix Version/s: 1.4.0

1.4: 891f359d710146acf3d05cd2af3bb430a8fbc99b

> Add credit field in PartitionRequest message
> 
>
> Key: FLINK-7661
> URL: https://issues.apache.org/jira/browse/FLINK-7661
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Affects Versions: 1.4.0
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | 
> {{queueIndex}} | {{InputChannelID}} fields.
> We will add a new {{credit}} field indicating the initial credit of 
> {{InputChannel}}, and this info can be got from {{InputChannel}} directly 
> after assigning exclusive buffers to it.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4800: [FLINK-7808] [REST] JobDetails constructor checks ...

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4800


---


[GitHub] flink pull request #4791: [hotfix] [Javadoc] Fix typos

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4791


---


[jira] [Closed] (FLINK-7807) HandlerUtils methods should log errors

2017-10-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7807.
---
Resolution: Fixed

1.4: a9c13c7d90253cc59749c20e6ee6e7f790cb598a

> HandlerUtils methods should log errors
> --
>
> Key: FLINK-7807
> URL: https://issues.apache.org/jira/browse/FLINK-7807
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>
> The {{HandlerUtils}} methods for sending (error) responses send sanitized 
> responses in case of exceptions, but don't log them in any way making 
> debugging impossible.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Closed] (FLINK-7808) JobDetails constructor should check length of tasksPerState array

2017-10-11 Thread Chesnay Schepler (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chesnay Schepler closed FLINK-7808.
---
Resolution: Fixed

1.4: e2ae45b48345cf56501530e101f3c8523448ab79

> JobDetails constructor should check length of tasksPerState array
> -
>
> Key: FLINK-7808
> URL: https://issues.apache.org/jira/browse/FLINK-7808
> Project: Flink
>  Issue Type: Bug
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
>Priority: Minor
> Fix For: 1.4.0
>
>
> The JobDetails constructor accepts an {{int[] tasksPerState}} argument, which 
> represents the number of tasks in each {{ExecutionState}}. There is no check 
> in place to verify that the array has the correct size, which the json 
> serializer assumes to be the case.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4797: [hotfix][doc] Remove outdated best-practice sugges...

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4797


---


[GitHub] flink pull request #4698: [FLINK-7661][network] Add credit field in Partitio...

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4698


---


[jira] [Commented] (FLINK-6926) Add MD5/SHA1/SHA2 supported in SQL

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200980#comment-16200980
 ] 

ASF GitHub Bot commented on FLINK-6926:
---

GitHub user genged opened a pull request:

https://github.com/apache/flink/pull/4810

[FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL

## What is the purpose of the change

This pull request implements MD5, SHA1 and SHA256 support in Flink SQL as 
discussed in FLINK-6926

## Brief change log

  - Added MD5, SHA1, SHA256 SQL functions
  - Added relevant unit tests

## Verifying this change

This change added tests and can be verified as follows:

  - Added SQL expression tests
  - Added HashFunctionsTest with testAllApis
  - Validated both correct calculation and behavior for null input

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: don't know
  - The runtime per-record code paths (performance sensitive): don't know
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/genged/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4810


commit 670ffd0c438a6d519d580a077582121b66b425f5
Author: Michael Gendelman 
Date:   2017-10-08T21:00:39Z

[FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL




> Add MD5/SHA1/SHA2 supported in SQL
> --
>
> Key: FLINK-6926
> URL: https://issues.apache.org/jira/browse/FLINK-6926
> Project: Flink
>  Issue Type: Sub-task
>  Components: Table API & SQL
>Affects Versions: 1.4.0
>Reporter: sunjincheng
>Assignee: Shaoxuan Wang
>
> MD5(str)Calculates an MD5 128-bit checksum for the string. The value is 
> returned as a string of 32 hexadecimal digits, or NULL if the argument was 
> NULL. The return value can, for example, be used as a hash key. See the notes 
> at the beginning of this section about storing hash values efficiently.
> The return value is a nonbinary string in the connection character set.
> * Example:
>  MD5('testing') - 'ae2b1fca515949e5d54fb22b8ed95575'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha1]
> SHA1(str), SHA(str)Calculates an SHA-1 160-bit checksum for the string, as 
> described in RFC 3174 (Secure Hash Algorithm). The value is returned as a 
> string of 40 hexadecimal digits, or NULL if the argument was NULL. One of the 
> possible uses for this function is as a hash key. See the notes at the 
> beginning of this section about storing hash values efficiently. You can also 
> use SHA1() as a cryptographic function for storing passwords. SHA() is 
> synonymous with SHA1().
> The return value is a nonbinary string in the connection character set.
> * Example:
>   SHA1('abc') -> 'a9993e364706816aba3e25717850c26c9cd0d89d'
> SHA2(str, hash_length)Calculates the SHA-2 family of hash functions (SHA-224, 
> SHA-256, SHA-384, and SHA-512). The first argument is the cleartext string to 
> be hashed. The second argument indicates the desired bit length of the 
> result, which must have a value of 224, 256, 384, 512, or 0 (which is 
> equivalent to 256). If either argument is NULL or the hash length is not one 
> of the permitted values, the return value is NULL. Otherwise, the function 
> result is a hash value containing the desired number of bits. See the notes 
> at the beginning of this section about storing hash values efficiently.
> The return value is a nonbinary string in the connection character set.
> * Example:
> SHA2('abc', 224) -> '23097d223405d8228642a477bda255b32aadbce4bda0b3f7e36c9da7'
> * See more:
> ** [MySQL| 
> https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha2]



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4810: [FLINK-6926] [table] Add support for MD5,SHA1 and ...

2017-10-11 Thread genged
GitHub user genged opened a pull request:

https://github.com/apache/flink/pull/4810

[FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL

## What is the purpose of the change

This pull request implements MD5, SHA1 and SHA256 support in Flink SQL as 
discussed in FLINK-6926

## Brief change log

  - Added MD5, SHA1, SHA256 SQL functions
  - Added relevant unit tests

## Verifying this change

This change added tests and can be verified as follows:

  - Added SQL expression tests
  - Added HashFunctionsTest with testAllApis
  - Validated both correct calculation and behavior for null input

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: don't know
  - The runtime per-record code paths (performance sensitive): don't know
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no

## Documentation

  - Does this pull request introduce a new feature? yes
  - If yes, how is the feature documented? not documented



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/genged/flink master

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4810.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4810


commit 670ffd0c438a6d519d580a077582121b66b425f5
Author: Michael Gendelman 
Date:   2017-10-08T21:00:39Z

[FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL




---


[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200822#comment-16200822
 ] 

Aljoscha Krettek commented on FLINK-7540:
-

[~oty5081] Thanks a lot for reporting this!

The code that is starting the Akka actor system in the YARN case (pre FLIP-6) 
is here: 
https://github.com/apache/flink/blob/b4120c1e15be1c36d07dfb29080e29750d5a0955/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java#L309.
 We would probably have to use the same hostname normalisation here.

[~till.rohrmann] What do you think about this? For FLIP-6 there seem to be 
several different entry points and it's not always clear to me how the actor 
system get's its name.

> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Priority: Blocker
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, 
> Akka seems to preserve the uppercase/lowercase distinctions when starting the 
> Actor. This leads to problems because other parts (for example 
> {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
> cluster.
> h1. Original Issue Text
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  

[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7540:

Description: 
In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, Akka 
seems to preserve the uppercase/lowercase distinctions when starting the Actor. 
This leads to problems because other parts (for example 
{{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional 
cluster.

h1. Original Issue Text

Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
DSJ-signal-900G-71”
When using the following command:
./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
~/flink-1.3.1/examples/batch/WordCount.jar --input 
/user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result  
Or
./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
"flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
There will be some exceptions at Command line interface:

java.lang.RuntimeException: Unable to get ClusterClient status from Application 
Client
at 
org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
…
Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.

h4. Then the job fails , starting the yarn-session is the same.

The exceptions of the application log:
2017-08-10 17:36:10,334 WARN  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]
…
2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]] after [1 ms]


And I found some differences in actor System:
2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager  
- Starting JobManager at 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager  
- JobManager 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership 
with leader session ID Some(----).
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
listening at 0:0:0:0:0:0:0:0:54921
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
54921
2017-08-10 17:36:00,313 INFO  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
reachable under 
akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.


The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
The hostname of JobManagerRetriever’s actor is lowercase.


And I read source code,
Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 
127:
public static String unresolvedHostToNormalizedString(String host) {

// Return loopback interface address if host is null
// This represents the behavior of {@code InetAddress.getByName } and RFC 3330  
if (host == null) { 
   host = InetAddress.getLoopbackAddress().getHostAddress();
} else {host = host.trim().toLowerCase();   
}
...
}


It turns the host name into lowercase.
Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem.
Then I removed the call to the toLowerCase() method in the source code.

Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.




  was:

h1. Original Issue Text

Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
DSJ-signal-900G-71”
When using the following command:
./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
~/flink-1.3.1/examples/batch/WordCount.jar --input 
/user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result  
Or
./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
"flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
There will be some exceptions at Command line interface:

java.lang.RuntimeException: Unable to get ClusterClient status from Application 
Client
at 
org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
…
Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.

h4. Then the job 

[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7540:

Description: 

h1. Original Issue Text

Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
DSJ-signal-900G-71”
When using the following command:
./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
~/flink-1.3.1/examples/batch/WordCount.jar --input 
/user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result  
Or
./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
"flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
There will be some exceptions at Command line interface:

java.lang.RuntimeException: Unable to get ClusterClient status from Application 
Client
at 
org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
…
Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.

h4. Then the job fails , starting the yarn-session is the same.

The exceptions of the application log:
2017-08-10 17:36:10,334 WARN  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: 
ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]
…
2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager
- Resource manager could not register at JobManager
akka.pattern.AskTimeoutException: Ask timed out on 
[ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
Path(/user/jobmanager)]] after [1 ms]


And I found some differences in actor System:
2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager  
- Starting JobManager at 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager  
- JobManager 
akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership 
with leader session ID Some(----).
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
listening at 0:0:0:0:0:0:0:0:54921
2017-08-10 17:36:00,312 INFO  
org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
54921
2017-08-10 17:36:00,313 INFO  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
reachable under 
akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.


The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
The hostname of JobManagerRetriever’s actor is lowercase.


And I read source code,
Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 
127:
public static String unresolvedHostToNormalizedString(String host) {

// Return loopback interface address if host is null
// This represents the behavior of {@code InetAddress.getByName } and RFC 3330  
if (host == null) { 
   host = InetAddress.getLoopbackAddress().getHostAddress();
} else {host = host.trim().toLowerCase();   
}
...
}


It turns the host name into lowercase.
Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem.
Then I removed the call to the toLowerCase() method in the source code.

Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.




  was:
Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
DSJ-signal-900G-71”
When using the following command:
./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
~/flink-1.3.1/examples/batch/WordCount.jar --input 
/user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result  
Or
./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
"flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
There will be some exceptions at Command line interface:

java.lang.RuntimeException: Unable to get ClusterClient status from Application 
Client
at 
org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
…
Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
leading JobManager. Please check that the JobManager is running.

h4. Then the job fails , starting the yarn-session is the same.

The exceptions of the application log:
2017-08-10 17:36:10,334 WARN  
org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
retrieve leader gateway and port.
akka.actor.ActorNotFound: Actor not found for: 

[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently

2017-10-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7540:

Summary: Akka hostnames are not normalised consistently  (was: submit a job 
on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with 
capitalized hostname)

> Akka hostnames are not normalised consistently
> --
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Priority: Blocker
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  
> } else {  host = host.trim().toLowerCase();   
> }
> ...
> }
> It turns the host name into lowercase.
> Therefore, JobManagerRetriever certainly can not find Jobmanager's 
> actorSYstem.
> Then I removed the call to the toLowerCase() method in the source code.
> Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200786#comment-16200786
 ] 

Hai Zhou UTC+8 commented on FLINK-7816:
---

Hi [~aljoscha], 
In *ClosureCleaner.clean(func, checkSerializable)* method,  Should we first 
check the *func.getClass* is  a closure?



> Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
> 
>
> Key: FLINK-7816
> URL: https://issues.apache.org/jira/browse/FLINK-7816
> Project: Flink
>  Issue Type: Sub-task
>  Components: Scala API
>Reporter: Aljoscha Krettek
>
> We have the same problem as Spark: SPARK-14540



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200785#comment-16200785
 ] 

ASF GitHub Bot commented on FLINK-7738:
---

Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
Updated the description based on the latest PR.


> Create WebSocket handler (server)
> -
>
> Key: FLINK-7738
> URL: https://issues.apache.org/jira/browse/FLINK-7738
> Project: Flink
>  Issue Type: Sub-task
>  Components: Cluster Management, Mesos
>Reporter: Eron Wright 
>Assignee: Eron Wright 
>
> An abstract handler is needed to support websocket communication.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...

2017-10-11 Thread EronWright
Github user EronWright commented on the issue:

https://github.com/apache/flink/pull/4767
  
Updated the description based on the latest PR.


---


[jira] [Updated] (FLINK-7540) submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname

2017-10-11 Thread Aljoscha Krettek (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Aljoscha Krettek updated FLINK-7540:

Component/s: Distributed Coordination

> submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop 
> cluster with capitalized hostname
> 
>
> Key: FLINK-7540
> URL: https://issues.apache.org/jira/browse/FLINK-7540
> Project: Flink
>  Issue Type: Bug
>  Components: Distributed Coordination, YARN
>Affects Versions: 1.3.1, 1.4.0, 1.3.2
>Reporter: Tong Yan Ou
>Priority: Blocker
>  Labels: patch
> Fix For: 1.3.3
>
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> Hostnames in my  hadoop cluster are like these: “DSJ-RTB-4T-177”,” 
> DSJ-signal-900G-71”
> When using the following command:
> ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 
> ~/flink-1.3.1/examples/batch/WordCount.jar --input 
> /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result 
>  
> Or
> ./bin/yarn-session.sh -d -jm 6144  -tm 12288 -qu xl_trip -s 24 -n 5 -nm 
> "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip"
> There will be some exceptions at Command line interface:
> java.lang.RuntimeException: Unable to get ClusterClient status from 
> Application Client
> at 
> org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243)
> …
> Caused by: org.apache.flink.util.FlinkException: Could not connect to the 
> leading JobManager. Please check that the JobManager is running.
> h4. Then the job fails , starting the yarn-session is the same.
> The exceptions of the application log:
> 2017-08-10 17:36:10,334 WARN  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - Failed to 
> retrieve leader gateway and port.
> akka.actor.ActorNotFound: Actor not found for: 
> ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]
> …
> 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager  
>   - Resource manager could not register at JobManager
> akka.pattern.AskTimeoutException: Ask timed out on 
> [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), 
> Path(/user/jobmanager)]] after [1 ms]
> And I found some differences in actor System:
> 2017-08-10 17:35:56,791 INFO  org.apache.flink.yarn.YarnJobManager
>   - Starting JobManager at 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager.
> 2017-08-10 17:35:56,880 INFO  org.apache.flink.yarn.YarnJobManager
>   - JobManager 
> akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted 
> leadership with leader session ID Some(----).
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend 
> listening at 0:0:0:0:0:0:0:0:54921
> 2017-08-10 17:36:00,312 INFO  
> org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with 
> JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 
> 54921
> 2017-08-10 17:36:00,313 INFO  
> org.apache.flink.runtime.webmonitor.JobManagerRetriever   - New leader 
> reachable under 
> akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----.
> The JobManager is  “akka.tcp://flink@DSJ-signal-4T-248:65082” and the 
> JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082”
> The hostname of JobManagerRetriever’s actor is lowercase.
> And I read source code,
> Class NetUtils the unresolvedHostToNormalizedString(String host) method of 
> line 127:
>   public static String unresolvedHostToNormalizedString(String host) {
> 
> // Return loopback interface address if host is null  
> // This represents the behavior of {@code InetAddress.getByName } and RFC 
> 3330if (host == null) { 
>host = InetAddress.getLoopbackAddress().getHostAddress();  
> } else {  host = host.trim().toLowerCase();   
> }
> ...
> }
> It turns the host name into lowercase.
> Therefore, JobManagerRetriever certainly can not find Jobmanager's 
> actorSYstem.
> Then I removed the call to the toLowerCase() method in the source code.
> Finally ,I can submit a job in yarn-cluster mode and start a yarn-session.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector

2017-10-11 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200762#comment-16200762
 ] 

Aljoscha Krettek commented on FLINK-6951:
-

[~phoenixjiangnan] & [~tzulitai] Is this still an issue?

> Incompatible versions of httpcomponents jars for Flink kinesis connector
> 
>
> Key: FLINK-6951
> URL: https://issues.apache.org/jira/browse/FLINK-6951
> Project: Flink
>  Issue Type: Bug
>  Components: Kinesis Connector
>Affects Versions: 1.3.0
>Reporter: Ted Yu
>Assignee: Bowen Li
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> In the following thread, Bowen reported incompatible versions of 
> httpcomponents jars for Flink kinesis connector :
> http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector
> We should find a solution such that users don't have to change dependency 
> version(s) themselves when building Flink kinesis connector.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Chesnay Schepler (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200672#comment-16200672
 ] 

Chesnay Schepler commented on FLINK-7608:
-

that's exactly what you have to implement.

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200658#comment-16200658
 ] 

ASF GitHub Bot commented on FLINK-7791:
---

Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4802
  
What do you mean when you say triggerSavepoint is not a command of the 
ClusterClient? It does have a `triggerSavepoint` method.


> Integrate LIST command into REST client
> ---
>
> Key: FLINK-7791
> URL: https://issues.apache.org/jira/browse/FLINK-7791
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

2017-10-11 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/4802
  
What do you mean when you say triggerSavepoint is not a command of the 
ClusterClient? It does have a `triggerSavepoint` method.


---


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200655#comment-16200655
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

Github user razvan100 closed the pull request at:

https://github.com/apache/flink/pull/4808


> Update savepoint Documentation
> --
>
> Key: FLINK-7803
> URL: https://issues.apache.org/jira/browse/FLINK-7803
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Razvan
>Assignee: Razvan
>
> Can you please update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  to specify the savepoint location *MUST* always be a location accessible by 
> all hosts?
> I spent quite some time believing it'S a bug and trying to find solutions, 
> see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the 
> current documentation and other might waste time also believing it's an 
> actual issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4808: [FLINK-7803][Documentation] Add missing informatio...

2017-10-11 Thread razvan100
Github user razvan100 closed the pull request at:

https://github.com/apache/flink/pull/4808


---


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200652#comment-16200652
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

GitHub user razvan100 opened a pull request:

https://github.com/apache/flink/pull/4809

[FLINK-7803][Documentation] Add missing savepoint information

This fixes FLINK-7803 by emphasizing the savepoint save location should be 
on a distributed file-system.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/razvan100/flink patch-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4809


commit a3fa67e0dbc3498a5a916dbe1f7702366d793867
Author: Razvan 
Date:   2017-10-11T17:38:50Z

[FLINK-7803] Add missing savepoint information

This fixes FLINK-7803 by emphasizing the savepoint save location should be 
on a distributed file-system.




> Update savepoint Documentation
> --
>
> Key: FLINK-7803
> URL: https://issues.apache.org/jira/browse/FLINK-7803
> Project: Flink
>  Issue Type: Bug
>  Components: Documentation
>Reporter: Razvan
>Assignee: Razvan
>
> Can you please update 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html
>  to specify the savepoint location *MUST* always be a location accessible by 
> all hosts?
> I spent quite some time believing it'S a bug and trying to find solutions, 
> see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the 
> current documentation and other might waste time also believing it's an 
> actual issue.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...

2017-10-11 Thread razvan100
GitHub user razvan100 opened a pull request:

https://github.com/apache/flink/pull/4809

[FLINK-7803][Documentation] Add missing savepoint information

This fixes FLINK-7803 by emphasizing the savepoint save location should be 
on a distributed file-system.



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/razvan100/flink patch-3

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4809.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4809


commit a3fa67e0dbc3498a5a916dbe1f7702366d793867
Author: Razvan 
Date:   2017-10-11T17:38:50Z

[FLINK-7803] Add missing savepoint information

This fixes FLINK-7803 by emphasizing the savepoint save location should be 
on a distributed file-system.




---


[jira] [Commented] (FLINK-7803) Update savepoint Documentation

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200626#comment-16200626
 ] 

ASF GitHub Bot commented on FLINK-7803:
---

GitHub user razvan100 opened a pull request:

https://github.com/apache/flink/pull/4808

[FLINK-7803][Documentation] Add missing information about savepoint location

FLINK-7803 FLINK-7750


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not 

[GitHub] flink pull request #4808: [FLINK-7803][Documentation] Add missing informatio...

2017-10-11 Thread razvan100
GitHub user razvan100 opened a pull request:

https://github.com/apache/flink/pull/4808

[FLINK-7803][Documentation] Add missing information about savepoint location

FLINK-7803 FLINK-7750


*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/razvan100/flink patch-2

Alternatively you can review and apply these changes as the patch at:


[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585
 ] 

Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:54 PM:
-

Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
MetricGroup metricGroup  // equal metricGroup  from the constructor
int windowSize  // equal histogramWindowSize from the constructor
latencyStats = HashMap
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = this.latencyStats.get(key);
 if sourceStats == null then {
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
  this.metricGroup.histogram(key, sourceStats)
  }
  this.sourceStats.addValue(System.currentTimeMillis() - 
marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?


was (Author: yew1eb):
Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
MetricGroup metricGroup  // equal metricGroup  from the constructor
int windowSize  // equal histogramWindowSize from the constructor
latencyStats = HashMap
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then {
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
  this.metricGroup.histogram(key, sourceStats)
  }
  this.sourceStats.addValue(System.currentTimeMillis() - 
marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585
 ] 

Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:53 PM:
-

Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
MetricGroup metricGroup  // equal metricGroup  from the constructor
int windowSize  // equal histogramWindowSize from the constructor
latencyStats = HashMap
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then {
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
  this.metricGroup.histogram(key, sourceStats)
  }
  this.sourceStats.addValue(System.currentTimeMillis() - 
marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?


was (Author: yew1eb):
Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
MetricGroup metricGroup  // equal metricGroup  from the constructor
int windowSize  // equal histogramWindowSize from the constructor
latencyStats = HashMap
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585
 ] 

Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:52 PM:
-

Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
metricGroup  // equal metricGroup  from the constructor
windowSize  // equal HistogramWindowSize from the constructor
latencyStats = HashMap;
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?


was (Author: yew1eb):
Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int HistogramWindowSize)
{code}

2. contains three fields
{code:java}
metricGroup  // equal metricGroup  from the constructor
windowSize  // equal HistogramWindowSize from the constructor
latencyStats = HashMap;
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585
 ] 

Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:52 PM:
-

Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
MetricGroup metricGroup  // equal metricGroup  from the constructor
int windowSize  // equal histogramWindowSize from the constructor
latencyStats = HashMap
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?


was (Author: yew1eb):
Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize)
{code}

2. contains three fields
{code:java}
metricGroup  // equal metricGroup  from the constructor
windowSize  // equal HistogramWindowSize from the constructor
latencyStats = HashMap;
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric

2017-10-11 Thread Hai Zhou UTC+8 (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585
 ] 

Hai Zhou UTC+8 commented on FLINK-7608:
---

Before I start, I want discuss my idea:
redesign a class that measures latency, named LatencyStatistics.
the following is the structure of this class:

1.contains a constructor
{code:java}
 LatencyStatistics(MetricGroup metricGroup, int HistogramWindowSize)
{code}

2. contains three fields
{code:java}
metricGroup  // equal metricGroup  from the constructor
windowSize  // equal HistogramWindowSize from the constructor
latencyStats = HashMap;
{code}

3. contains a method to receive latencyMarker
{code:java}
reportLatency(latencyMarker maker) { 
 String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ 
 LatencyHistogram sourceStats = latencyStats.get(key);
 if sourceStats == null then
  sourceStats = new LatencyHistogram(this.windowSize)
  this.latencyStats.put(key, sourceStats)
   this.metricGroup.histogram(key, sourceStats)
this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime());
}
{code}

the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a 
*DescriptiveStatistics* internally.

[~Zentol] [~rmetzger] [~aljoscha], what is your opinions here?

> LatencyGauge change to  histogram metric
> 
>
> Key: FLINK-7608
> URL: https://issues.apache.org/jira/browse/FLINK-7608
> Project: Flink
>  Issue Type: Bug
>  Components: Metrics
>Reporter: Hai Zhou UTC+8
>Assignee: Hai Zhou UTC+8
>Priority: Blocker
> Fix For: 1.4.0, 1.3.3
>
>
> I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831]  to 
> export metrics the log file.
> I found:
> {noformat}
> -- Gauges 
> -
> ..
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Map.0.latency:
>  value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, 
> p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}}
> zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming 
> Job.Sink- Unnamed.0.latency: 
> value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, 
> p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}}
> ..
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Created] (FLINK-7817) Add TaskManagerDetailsHandler

2017-10-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7817:


 Summary: Add TaskManagerDetailsHandler
 Key: FLINK-7817
 URL: https://issues.apache.org/jira/browse/FLINK-7817
 Project: Flink
  Issue Type: Sub-task
  Components: REST, Webfrontend
Affects Versions: 1.4.0
Reporter: Till Rohrmann


We should split the legacy {{TaskManagersHandler}} into a new 
{{TaskManagerHandler}} and a {{TaskManagerDetailsHandler}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint

2017-10-11 Thread Till Rohrmann (JIRA)

 [ 
https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Till Rohrmann reassigned FLINK-7648:


Assignee: Till Rohrmann

> Port TaskManagersHandler to new REST endpoint
> -
>
> Key: FLINK-7648
> URL: https://issues.apache.org/jira/browse/FLINK-7648
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST, Webfrontend
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>  Labels: flip-6
> Fix For: 1.4.0
>
>
> Port existing {{TaskManagersHandler}} to the new REST endpoint



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200564#comment-16200564
 ] 

ASF GitHub Bot commented on FLINK-7810:
---

GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4807

[FLINK-7810] Switch from custom Flakka to Akka 2.4.x 

## What is the purpose of the change

Drop support for Scala 2.10 and then update to a newer Akka version. 
Before, we were forced to stay on our custom Flakka 2.3.x version with back 
ports because newer Akka does not support Scala 2.10.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-7729-drop-210-akka-update

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4807


commit 24e70936eac780792dd00d63f833b286f4aa2aaf
Author: Aljoscha Krettek 
Date:   2017-10-06T09:42:00Z

[FLINK-7809] Remove support for Scala 2.10

commit e64135f507256130b4d352e93152b736265de9c2
Author: Aljoscha Krettek 
Date:   2017-10-06T12:33:43Z

[FLINK-7810] Switch from custom Flakka to Akka 2.4.x

We can do this now that we dropped support for Scala 2.10.




> Switch from custom Flakka to Akka 2.4.x
> ---
>
> Key: FLINK-7810
> URL: https://issues.apache.org/jira/browse/FLINK-7810
> Project: Flink
>  Issue Type: Improvement
>  Components: Distributed Coordination
>Reporter: Aljoscha Krettek
>Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4...

2017-10-11 Thread aljoscha
GitHub user aljoscha opened a pull request:

https://github.com/apache/flink/pull/4807

[FLINK-7810] Switch from custom Flakka to Akka 2.4.x 

## What is the purpose of the change

Drop support for Scala 2.10 and then update to a newer Akka version. 
Before, we were forced to stay on our custom Flakka 2.3.x version with back 
ports because newer Akka does not support Scala 2.10.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/aljoscha/flink jira-7729-drop-210-akka-update

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4807.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4807


commit 24e70936eac780792dd00d63f833b286f4aa2aaf
Author: Aljoscha Krettek 
Date:   2017-10-06T09:42:00Z

[FLINK-7809] Remove support for Scala 2.10

commit e64135f507256130b4d352e93152b736265de9c2
Author: Aljoscha Krettek 
Date:   2017-10-06T12:33:43Z

[FLINK-7810] Switch from custom Flakka to Akka 2.4.x

We can do this now that we dropped support for Scala 2.10.




---


[jira] [Created] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner

2017-10-11 Thread Aljoscha Krettek (JIRA)
Aljoscha Krettek created FLINK-7816:
---

 Summary: Support Scala 2.12 closures and Java 8 lambdas in 
ClosureCleaner
 Key: FLINK-7816
 URL: https://issues.apache.org/jira/browse/FLINK-7816
 Project: Flink
  Issue Type: Sub-task
  Components: Scala API
Reporter: Aljoscha Krettek


We have the same problem as Spark: SPARK-14540



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144025139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
+
+   // Enqueue the remote input channel
+   handler.notifyCreditAvailable(inputChannel);
+
+   ch.runPendingTasks();
+
+   // Read the enqueued msg
+   Object msg1 = ch.readOutbound();
+
+   // Should notify credit
+   assertEquals(msg1.getClass(), AddCredit.class);
+   }
+
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
+* message is not sent after the input channel is released.
+*/
+   @Test
+   public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
--- End diff --

`channel`


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144052410
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) {
}
}
 
+   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
+   // Implement in CreditBasedClientHandler
--- End diff --

is that a next PR? Is it related to `CreditBasedClientHandler:: 
notifyCreditAvailable`?


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144016699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -104,8 +130,8 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
final SocketAddress remoteAddr = 
ctx.channel().remoteAddress();
 
notifyAllChannelsOfErrorAndClose(new 
RemoteTransportException(
-   "Connection unexpectedly closed by 
remote task manager '" + remoteAddr + "'. "
-   + "This might indicate 
that the remote task manager was lost.",
+   "Connection unexpectedly closed by remote task 
manager '" + remoteAddr + "'. "
--- End diff --

ditto, and same applies for other changes like this?


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200523#comment-16200523
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144013921
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
--- End diff --

Shouldn't this be a part of previous commit?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144013921
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
--- End diff --

Shouldn't this be a part of previous commit?


---


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144025089
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
--- End diff --

`ch` -> `channel`


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200522#comment-16200522
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144022698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null) {
+   return;
+   }
+
+   if (channel.isWritable()) {
--- End diff --

please revert if condition (smaller branch first). Same applies to an later 
if


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144022698
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel 
inputChannel, NettyMessage.B
bufferOrEvent.releaseBuffer();
}
}
+
+   private void writeAndFlushNextMessageIfPossible(Channel channel) {
+   if (channelError.get() != null) {
+   return;
+   }
+
+   if (channel.isWritable()) {
--- End diff --

please revert if condition (smaller branch first). Same applies to an later 
if


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200527#comment-16200527
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144052410
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 ---
@@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) {
}
}
 
+   void notifyCreditAvailable(RemoteInputChannel inputChannel) {
+   // Implement in CreditBasedClientHandler
--- End diff --

is that a next PR? Is it related to `CreditBasedClientHandler:: 
notifyCreditAvailable`?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...

2017-10-11 Thread pnowojski
Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144014064
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

ditto


---


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200524#comment-16200524
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144014064
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -37,18 +43,29 @@
 
 import java.io.IOException;
 import java.net.SocketAddress;
+import java.util.ArrayDeque;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicReference;
 
+/**
+ * Channel handler to read {@link BufferResponse} and {@link 
ErrorResponse} messages from the
+ * producer, to write and flush {@link AddCredit} message for the producer.
+ */
 class CreditBasedClientHandler extends ChannelInboundHandlerAdapter {
 
private static final Logger LOG = 
LoggerFactory.getLogger(CreditBasedClientHandler.class);
 
+   /** Channels, which already requested partitions from the producers. */
--- End diff --

ditto


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200526#comment-16200526
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144025139
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
+
+   final RemoteInputChannel inputChannel = 
mock(RemoteInputChannel.class);
+
+   // Enqueue the remote input channel
+   handler.notifyCreditAvailable(inputChannel);
+
+   ch.runPendingTasks();
+
+   // Read the enqueued msg
+   Object msg1 = ch.readOutbound();
+
+   // Should notify credit
+   assertEquals(msg1.getClass(), AddCredit.class);
+   }
+
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, but {@link AddCredit}
+* message is not sent after the input channel is released.
+*/
+   @Test
+   public void testNotifyCreditAvailableAfterReleased() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
--- End diff --

`channel`


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200528#comment-16200528
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144025089
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java
 ---
@@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception {
client.cancelRequestFor(inputChannel.getInputChannelId());
}
 
+   /**
+* Verifies that {@link RemoteInputChannel} is enqueued in the 
pipeline, and
+* {@link AddCredit} message is sent to the producer.
+*/
+   @Test
+   public void testNotifyCreditAvailable() throws Exception {
+   final CreditBasedClientHandler handler = new 
CreditBasedClientHandler();
+   final EmbeddedChannel ch = new EmbeddedChannel(handler);
--- End diff --

`ch` -> `channel`


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200525#comment-16200525
 ] 

ASF GitHub Bot commented on FLINK-7416:
---

Github user pnowojski commented on a diff in the pull request:

https://github.com/apache/flink/pull/4533#discussion_r144016699
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java
 ---
@@ -104,8 +130,8 @@ public void channelInactive(ChannelHandlerContext ctx) 
throws Exception {
final SocketAddress remoteAddr = 
ctx.channel().remoteAddress();
 
notifyAllChannelsOfErrorAndClose(new 
RemoteTransportException(
-   "Connection unexpectedly closed by 
remote task manager '" + remoteAddr + "'. "
-   + "This might indicate 
that the remote task manager was lost.",
+   "Connection unexpectedly closed by remote task 
manager '" + remoteAddr + "'. "
--- End diff --

ditto, and same applies for other changes like this?


> Implement Netty receiver outgoing pipeline for credit-based
> ---
>
> Key: FLINK-7416
> URL: https://issues.apache.org/jira/browse/FLINK-7416
> Project: Flink
>  Issue Type: Sub-task
>  Components: Network
>Reporter: zhijiang
>Assignee: zhijiang
> Fix For: 1.4.0
>
>
> This is a part of work for credit-based network flow control.
> The related works are :
> *  We define a new message called {{AddCredit}} to notify the incremental 
> credit during data shuffle. 
> * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the 
> channel is enqueued in the pipeline.
> * Whenever the channel becomes writable, it takes the next {{InputChannel}} 
> and sends its unannounced credit. The credit is reset to zero after each sent.
> * That way, messages are sent as often as the network has capacity and 
> contain as much credit as available for the channel at that point in time. 
> Otherwise, it would only add latency to the announcements and not increase 
> throughput.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7653) Properly implement DispatcherGateway methods on the Dispatcher

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200519#comment-16200519
 ] 

ASF GitHub Bot commented on FLINK-7653:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4793
  
Thanks for the review @zentol. Rebasing onto the latest master and merging 
after Travis gives green light.


> Properly implement DispatcherGateway methods on the Dispatcher
> --
>
> Key: FLINK-7653
> URL: https://issues.apache.org/jira/browse/FLINK-7653
> Project: Flink
>  Issue Type: Sub-task
>  Components: Distributed Coordination, REST, Webfrontend
>Affects Versions: 1.4.0
>Reporter: Tzu-Li (Gordon) Tai
>Assignee: Till Rohrmann
>
> Currently, {{DispatcherGateway}} methods such as {{listJobs}}, 
> {{requestStatusOverview}}, and probably other new methods that will be added 
> as we port more existing REST handlers to the new endpoint, have only dummy 
> placeholder implementations in the {{Dispatcher}} marked with TODOs.
> This ticket is to track that they are all eventually properly implemented.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4793: [FLINK-7653] Properly implement Dispatcher#requestCluster...

2017-10-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4793
  
Thanks for the review @zentol. Rebasing onto the latest master and merging 
after Travis gives green light.


---


[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200509#comment-16200509
 ] 

ASF GitHub Bot commented on FLINK-7791:
---

Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4802
  
But `triggerSavepoint` is not a command of the `ClusterClient` but of the 
`CliFrontend`. All other `ClusterClient` commands like `stop` and `cancel` 
behave differently.


> Integrate LIST command into REST client
> ---
>
> Key: FLINK-7791
> URL: https://issues.apache.org/jira/browse/FLINK-7791
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...

2017-10-11 Thread tillrohrmann
Github user tillrohrmann commented on the issue:

https://github.com/apache/flink/pull/4802
  
But `triggerSavepoint` is not a command of the `ClusterClient` but of the 
`CliFrontend`. All other `ClusterClient` commands like `stop` and `cancel` 
behave differently.


---


[jira] [Commented] (FLINK-7815) Remove grouping from MultipleJobsDetails

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200496#comment-16200496
 ] 

ASF GitHub Bot commented on FLINK-7815:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4806

[FLINK-7815] Remove grouping from MultipleJobsDetails

## What is the purpose of the change

With this commit the MultipleJobsDetails instance only contains a list of 
all jobs
which could be retrieved from the cluster. With this change it is the 
responsibility
of the web ui to group the jobs into running and finished jobs.


## Brief change log

- Change `MultipleJobsDetails` to contain a single list of all retrieved 
jobs
- Adapt jobs.svc.coffee script to group list of jobs into running and 
finished jobs

## Verifying this change

This change has been manually tested.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorMultipleJobsDetails

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4806.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4806






> Remove grouping from MultipleJobsDetails
> 
>
> Key: FLINK-7815
> URL: https://issues.apache.org/jira/browse/FLINK-7815
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{MultipleJobsDetails}} object should only contain a list of retrieved 
> jobs instead of pre grouping the jobs into finished and running jobs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4806: [FLINK-7815] Remove grouping from MultipleJobsDeta...

2017-10-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4806

[FLINK-7815] Remove grouping from MultipleJobsDetails

## What is the purpose of the change

With this commit the MultipleJobsDetails instance only contains a list of 
all jobs
which could be retrieved from the cluster. With this change it is the 
responsibility
of the web ui to group the jobs into running and finished jobs.


## Brief change log

- Change `MultipleJobsDetails` to contain a single list of all retrieved 
jobs
- Adapt jobs.svc.coffee script to group list of jobs into running and 
finished jobs

## Verifying this change

This change has been manually tested.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink refactorMultipleJobsDetails

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4806.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4806






---


[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200490#comment-16200490
 ] 

ASF GitHub Bot commented on FLINK-7806:
---

GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4805

[FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under 
/jobs/overview

## What is the purpose of the change

This PR changes the REST endpoint URL of the `CurrentJobsOverviewHandler` 
from `/joboverview` to `/jobs/overview`. Moreover, it renames the handler to 
`JobsOverviewHandler`.

## Brief change log

- Rename CurrentJobsOverviewHandler to JobsOverviewHandler
- Change REST handler paths
- Remove joboverview/running and joboverview/completed from 
JobsOverviewHandler
- Adapt web ui files

## Verifying this change

- The change has been manually tested

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
moveCurrentJobsOverviewHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4805.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4805






> Move CurrentJobsOverviewHandler to jobs/overview
> 
>
> Key: FLINK-7806
> URL: https://issues.apache.org/jira/browse/FLINK-7806
> Project: Flink
>  Issue Type: Sub-task
>  Components: REST
>Affects Versions: 1.4.0
>Reporter: Till Rohrmann
>Assignee: Till Rohrmann
>Priority: Minor
>  Labels: flip-6
>
> The {{CurrentJobsOverviewHandler}} is currently registered under 
> {{/joboverview}}. I think it would be more idiomatic to register it under 
> {{/jobs/overview}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewH...

2017-10-11 Thread tillrohrmann
GitHub user tillrohrmann opened a pull request:

https://github.com/apache/flink/pull/4805

[FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under 
/jobs/overview

## What is the purpose of the change

This PR changes the REST endpoint URL of the `CurrentJobsOverviewHandler` 
from `/joboverview` to `/jobs/overview`. Moreover, it renames the handler to 
`JobsOverviewHandler`.

## Brief change log

- Rename CurrentJobsOverviewHandler to JobsOverviewHandler
- Change REST handler paths
- Remove joboverview/running and joboverview/completed from 
JobsOverviewHandler
- Adapt web ui files

## Verifying this change

- The change has been manually tested

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)



You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tillrohrmann/flink 
moveCurrentJobsOverviewHandler

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/4805.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #4805






---


[jira] [Created] (FLINK-7815) Remove grouping from MultipleJobsDetails

2017-10-11 Thread Till Rohrmann (JIRA)
Till Rohrmann created FLINK-7815:


 Summary: Remove grouping from MultipleJobsDetails
 Key: FLINK-7815
 URL: https://issues.apache.org/jira/browse/FLINK-7815
 Project: Flink
  Issue Type: Sub-task
  Components: REST
Affects Versions: 1.4.0
Reporter: Till Rohrmann
Assignee: Till Rohrmann
Priority: Minor


The {{MultipleJobsDetails}} object should only contain a list of retrieved jobs 
instead of pre grouping the jobs into finished and running jobs.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7770) Hide Queryable State behind a proxy.

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200486#comment-16200486
 ] 

ASF GitHub Bot commented on FLINK-7770:
---

Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4778


> Hide Queryable State behind a proxy.
> 
>
> Key: FLINK-7770
> URL: https://issues.apache.org/jira/browse/FLINK-7770
> Project: Flink
>  Issue Type: Improvement
>  Components: Queryable State
>Affects Versions: 1.4.0
>Reporter: Kostas Kloudas
>Assignee: Kostas Kloudas
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[GitHub] flink pull request #4778: [FLINK-7770][FLINK-7769][Queryable State] Refactor...

2017-10-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/4778


---


[jira] [Commented] (FLINK-7731) Trigger on GlobalWindow does not clean state completely

2017-10-11 Thread Aljoscha Krettek (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200409#comment-16200409
 ] 

Aljoscha Krettek commented on FLINK-7731:
-

Ah yes, sorry for not noticing that earlier. The combination of global windows 
(which don't have a good cleanup time), trigger firings and empty windows is 
indeed complex. There is also FLINK-5363 which aims at chaining the semantics a 
bit but it is highly likely that this introduces yet more different problems.

I think for now the behaviour is OK but we might need to improve documentation 
(in Javadocs) a bit. What do you think?

> Trigger on GlobalWindow does not clean state completely
> ---
>
> Key: FLINK-7731
> URL: https://issues.apache.org/jira/browse/FLINK-7731
> Project: Flink
>  Issue Type: Bug
>  Components: Core, DataStream API
>Affects Versions: 1.3.2
>Reporter: Gerard Garcia
>Priority: Minor
>
> I have an operator that consists of:
> CoGroup Datastream -> GlobalWindow -> CustomTrigger -> Apply function
> The custom trigger fires and purges the elements after it has received the 
> expected number of elements (or when a timeout fires) from one of the streams 
> and the apply function merges the elements with the ones received from the 
> other stream. It appears that the state of the operator grows continuously so 
> it seems it never gets completely cleaned.
> There is a discussion in 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clean-GlobalWidnow-state-td15613.html
>  that suggests that it may be a bug.
> This job reproduces the issue: 
> https://github.com/GerardGarcia/flink-global-window-growing-state



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client

2017-10-11 Thread ASF GitHub Bot (JIRA)

[ 
https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200358#comment-16200358
 ] 

ASF GitHub Bot commented on FLINK-7791:
---

Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/4802#discussion_r144027945
  
--- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -420,89 +420,72 @@ protected int list(String[] args) {
}
 
try {
-   ActorGateway jobManagerGateway = 
getJobManagerGateway(options);
-
-   LOG.info("Connecting to JobManager to retrieve list of 
jobs");
-   Future response = jobManagerGateway.ask(
-   
JobManagerMessages.getRequestRunningJobsStatus(),
-   clientTimeout);
+   CustomCommandLine activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
+   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
 
-   Object result;
+   Collection jobDetails;
try {
-   result = Await.result(response, clientTimeout);
-   }
-   catch (Exception e) {
-   throw new Exception("Could not retrieve running 
jobs from the JobManager.", e);
+   CompletableFuture 
jobDetailsFuture = client.listJobs();
--- End diff --

I can change it to that.


> Integrate LIST command into REST client
> ---
>
> Key: FLINK-7791
> URL: https://issues.apache.org/jira/browse/FLINK-7791
> Project: Flink
>  Issue Type: Sub-task
>  Components: Client, REST
>Affects Versions: 1.4.0
>Reporter: Chesnay Schepler
>Assignee: Chesnay Schepler
> Fix For: 1.4.0
>
>




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)


  1   2   3   >