[jira] [Comment Edited] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/FLINK-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200786#comment-16200786 ] Hai Zhou UTC+8 edited comment on FLINK-7816 at 10/12/17 5:39 AM: - Hi [~aljoscha], In *ClosureCleaner.clean(func, checkSerializable)* method, Should we first check the *func.getClass* is a closure? I create a tickect [FLINK-7819|https://issues.apache.org/jira/browse/FLINK-7819] to fix it. was (Author: yew1eb): Hi [~aljoscha], In *ClosureCleaner.clean(func, checkSerializable)* method, Should we first check the *func.getClass* is a closure? > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: FLINK-7816 > URL: https://issues.apache.org/jira/browse/FLINK-7816 > Project: Flink > Issue Type: Sub-task > Components: Scala API >Reporter: Aljoscha Krettek > > We have the same problem as Spark: SPARK-14540 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7819) Check object to clean is closure
Hai Zhou UTC+8 created FLINK-7819: - Summary: Check object to clean is closure Key: FLINK-7819 URL: https://issues.apache.org/jira/browse/FLINK-7819 Project: Flink Issue Type: Bug Components: Build System Affects Versions: 1.3.2 Reporter: Hai Zhou UTC+8 Assignee: Hai Zhou UTC+8 Fix For: 1.4.0 in *ClosureCleaner.clean(func) * method, we should check func is closure. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7475) ListState support update
[ https://issues.apache.org/jira/browse/FLINK-7475?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li reassigned FLINK-7475: --- Assignee: Bowen Li > ListState support update > > > Key: FLINK-7475 > URL: https://issues.apache.org/jira/browse/FLINK-7475 > Project: Flink > Issue Type: Improvement > Components: Core, DataStream API >Reporter: yf >Assignee: Bowen Li > > If I want to update the list. > I have to do two steps: > listState.clear() > for (Element e : myList) { > listState.add(e); > } > Why not I update the state by: > listState.update(myList) ? -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7671) testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build
[ https://issues.apache.org/jira/browse/FLINK-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7671: Description: {code:java} org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase) Time elapsed: 0.27 sec <<< FAILURE! java.lang.AssertionError: assertion failed: expected interface org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, found class org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424) at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224) at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108) {code} in https://travis-ci.org/apache/flink/jobs/278740892 was: {code:java} org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase) Time elapsed: 0.27 sec <<< FAILURE! java.lang.AssertionError: assertion failed: expected interface org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, found class org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure at scala.Predef$.assert(Predef.scala:179) at akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424) at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224) at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232) at akka.testkit.TestKitBase$class.within(TestKit.scala:296) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.TestKitBase$class.within(TestKit.scala:310) at akka.testkit.TestKit.within(TestKit.scala:718) at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133) at org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108) {code} in https://travis-ci.org/apache/flink/jobs/278740892 Component/s: JobManager > testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build > > > Key: FLINK-7671 > URL: https://issues.apache.org/jira/browse/FLINK-7671 > Project: Flink > Issue Type: Bug > Components: JobManager, Tests >Affects Versions: 1.4.0 >Reporter: Bowen Li > Fix For: 1.4.0 > > > {code:java} > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase > testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase) > Time elapsed: 0.27 sec <<< FAILURE! > java.lang.AssertionError: assertion failed: expected interface > org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, > found class > org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure > at scala.Predef$.assert(Predef.scala:179) > at > akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424) > at
[jira] [Updated] (FLINK-7671) testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build
[ https://issues.apache.org/jira/browse/FLINK-7671?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Bowen Li updated FLINK-7671: Summary: testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build (was: JobManagerCleanupITCase failed in build) > testBlobServerCleanupCancelledJob in JobManagerCleanupITCase failed in build > > > Key: FLINK-7671 > URL: https://issues.apache.org/jira/browse/FLINK-7671 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.4.0 >Reporter: Bowen Li > Fix For: 1.4.0 > > > {code:java} > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase > testBlobServerCleanupCancelledJob(org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase) > Time elapsed: 0.27 sec <<< FAILURE! > java.lang.AssertionError: assertion failed: expected interface > org.apache.flink.runtime.messages.JobManagerMessages$CancellationResponse, > found class > org.apache.flink.runtime.messages.JobManagerMessages$JobResultFailure > at scala.Predef$.assert(Predef.scala:179) > at > akka.testkit.TestKitBase$class.expectMsgClass_internal(TestKit.scala:424) > at akka.testkit.TestKitBase$class.expectMsgClass(TestKit.scala:410) > at akka.testkit.TestKit.expectMsgClass(TestKit.scala:718) > at akka.testkit.JavaTestKit.expectMsgClass(JavaTestKit.java:397) > at > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.run(JobManagerCleanupITCase.java:224) > at akka.testkit.JavaTestKit$Within$1.apply(JavaTestKit.java:232) > at akka.testkit.TestKitBase$class.within(TestKit.scala:296) > at akka.testkit.TestKit.within(TestKit.scala:718) > at akka.testkit.TestKitBase$class.within(TestKit.scala:310) > at akka.testkit.TestKit.within(TestKit.scala:718) > at akka.testkit.JavaTestKit$Within.(JavaTestKit.java:230) > at > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1$1.(JobManagerCleanupITCase.java:134) > at > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase$1.(JobManagerCleanupITCase.java:134) > at > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanup(JobManagerCleanupITCase.java:133) > at > org.apache.flink.runtime.jobmanager.JobManagerCleanupITCase.testBlobServerCleanupCancelledJob(JobManagerCleanupITCase.java:108) > {code} > in https://travis-ci.org/apache/flink/jobs/278740892 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201386#comment-16201386 ] ASF GitHub Bot commented on FLINK-7416: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144186213 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { + if (channelError.get() != null) { + return; + } + + if (channel.isWritable()) { --- End diff -- I guess you suggest to separate this PR into some smaller ones? > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144186213 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { + if (channelError.get() != null) { + return; + } + + if (channel.isWritable()) { --- End diff -- I guess you suggest to separate this PR into some smaller ones? ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201368#comment-16201368 ] ASF GitHub Bot commented on FLINK-7416: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4533 `notifyCreditAvailable` would be called by three logics in `RemoteInputChannel`. They are `RemoteInputChannel#recycle`, `RemoteInputChannel#notifyBufferAvailable` and `RemoteInputChannel#onSenderBacklog` which are covered in previous PRs. All the previous PRs are already merged into master except [FLINK-7406](https://github.com/apache/flink/pull/4509). In details, when the channel's credit is increased from zero, it will try to notify the producer of it. For example: 1. Recycle the exclusive buffers to increase credit after record processed. 2. The buffer pool notifies the channel of available floating buffers to increase credit 3. When receiving the backlog from producer, it may trigger to request floating buffers from buffer pool. Credit is increased when returned floating buffers from buffer pool. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4533 `notifyCreditAvailable` would be called by three logics in `RemoteInputChannel`. They are `RemoteInputChannel#recycle`, `RemoteInputChannel#notifyBufferAvailable` and `RemoteInputChannel#onSenderBacklog` which are covered in previous PRs. All the previous PRs are already merged into master except [FLINK-7406](https://github.com/apache/flink/pull/4509). In details, when the channel's credit is increased from zero, it will try to notify the producer of it. For example: 1. Recycle the exclusive buffers to increase credit after record processed. 2. The buffer pool notifies the channel of available floating buffers to increase credit 3. When receiving the backlog from producer, it may trigger to request floating buffers from buffer pool. Credit is increased when returned floating buffers from buffer pool. ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201353#comment-16201353 ] ASF GitHub Bot commented on FLINK-7416: --- Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144181936 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) { } } + void notifyCreditAvailable(RemoteInputChannel inputChannel) { + // Implement in CreditBasedClientHandler --- End diff -- The `PartitionRequestClientHandler` will keep the current network logic, and all the credit-based logics are implemented in new `CreditBasedClientHandler`. Finally the `PartitionRequestClientHandler` will be replaced by `CreditBasedClientHandler`. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user zhijiangW commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144181936 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) { } } + void notifyCreditAvailable(RemoteInputChannel inputChannel) { + // Implement in CreditBasedClientHandler --- End diff -- The `PartitionRequestClientHandler` will keep the current network logic, and all the credit-based logics are implemented in new `CreditBasedClientHandler`. Finally the `PartitionRequestClientHandler` will be replaced by `CreditBasedClientHandler`. ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201350#comment-16201350 ] ASF GitHub Bot commented on FLINK-7416: --- Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4533 @pnowojski , thanks for your reviews! I should explain the context beforehand. We present the separate `CreditBasedClientHandler` in order not to affect the current logic in master branch when partial PRs merged. Actually it would replace the current `PartitionRequestClientHandler` after all this feature codes merged. Different from previous [FLINK-7406](https://github.com/apache/flink/pull/4509) which is related with ingoing pipeline logic, and this PR is for outgoing pipeline logic. > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4533: [FLINK-7416][network] Implement Netty receiver outgoing p...
Github user zhijiangW commented on the issue: https://github.com/apache/flink/pull/4533 @pnowojski , thanks for your reviews! I should explain the context beforehand. We present the separate `CreditBasedClientHandler` in order not to affect the current logic in master branch when partial PRs merged. Actually it would replace the current `PartitionRequestClientHandler` after all this feature codes merged. Different from previous [FLINK-7406](https://github.com/apache/flink/pull/4509) which is related with ingoing pipeline logic, and this PR is for outgoing pipeline logic. ---
[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x
[ https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201263#comment-16201263 ] ASF GitHub Bot commented on FLINK-7810: --- Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4807 wonderful! +1 > Switch from custom Flakka to Akka 2.4.x > --- > > Key: FLINK-7810 > URL: https://issues.apache.org/jira/browse/FLINK-7810 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4.x
Github user bowenli86 commented on the issue: https://github.com/apache/flink/pull/4807 wonderful! +1 ---
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201258#comment-16201258 ] Bowen Li commented on FLINK-6951: - [~aljoscha] I believe so as I've seen a couple email threads complaining about the same problem. What do you think, Gordon? [~tzulitai] On my side, I have made a hacky workaround so this problem hasn't blocked me. I'm traveling now. I can re-take a closer look at it in about three weeks. > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201187#comment-16201187 ] ASF GitHub Bot commented on FLINK-7540: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4812 [FLINK-7540] Apply consistent hostname normalization ## What is the purpose of the change The hostname normalization is now applied when generating the remote akka config. That way it should be ensured that all ActorSystems are bound to a normalized hostname. ## Brief change log - Add hostname normalization to `AkkaUtils#getAkkaConfig` - Replace manual ActorSystem instantiation with `BootstrapTools#startActorSystem` ## Verifying this change - Added `AkkaUtilsTest#getAkkaConfig` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) It affects how `ActorSystem` are instantiated. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixHostnameNormalization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4812 commit 00876ead7a4a7492d643f6cba3e784044c54669e Author: Till RohrmannDate: 2017-10-11T23:17:23Z [FLINK-7540] Apply consistent hostname normalization The hostname normalization is now applied when generationg the remote akka config. That way it should be ensured that all ActorSystems are bound to a normalized hostname. > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Assignee: Till Rohrmann >Priority: Critical > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager
[GitHub] flink pull request #4812: [FLINK-7540] Apply consistent hostname normalizati...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4812 [FLINK-7540] Apply consistent hostname normalization ## What is the purpose of the change The hostname normalization is now applied when generating the remote akka config. That way it should be ensured that all ActorSystems are bound to a normalized hostname. ## Brief change log - Add hostname normalization to `AkkaUtils#getAkkaConfig` - Replace manual ActorSystem instantiation with `BootstrapTools#startActorSystem` ## Verifying this change - Added `AkkaUtilsTest#getAkkaConfig` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes) It affects how `ActorSystem` are instantiated. ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixHostnameNormalization Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4812.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4812 commit 00876ead7a4a7492d643f6cba3e784044c54669e Author: Till RohrmannDate: 2017-10-11T23:17:23Z [FLINK-7540] Apply consistent hostname normalization The hostname normalization is now applied when generationg the remote akka config. That way it should be ensured that all ActorSystems are bound to a normalized hostname. ---
[jira] [Created] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler
Till Rohrmann created FLINK-7818: Summary: Synchronize MetricStore access in the TaskManagersHandler Key: FLINK-7818 URL: https://issues.apache.org/jira/browse/FLINK-7818 Project: Flink Issue Type: Bug Components: Metrics, REST Affects Versions: 1.3.2, 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a single {{TaskManager}} are requested. The access is not synchronized which can be problematic because the {{MetricStore}} is not thread safe. I propose to add synchronization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview
[ https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201103#comment-16201103 ] ASF GitHub Bot commented on FLINK-7806: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4805 Not sure why the changed @zentol. I just executed the build process as it was described in the readme. > Move CurrentJobsOverviewHandler to jobs/overview > > > Key: FLINK-7806 > URL: https://issues.apache.org/jira/browse/FLINK-7806 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{CurrentJobsOverviewHandler}} is currently registered under > {{/joboverview}}. I think it would be more idiomatic to register it under > {{/jobs/overview}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4805 Not sure why the changed @zentol. I just executed the build process as it was described in the readme. ---
[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201096#comment-16201096 ] Till Rohrmann commented on FLINK-7540: -- This is indeed a big problem for setups where you have lower and upper case hostnames and if you use IPv6 addresses. The underlying problem is as Aljoscha pointed out that we don't apply the hostname normalisation consistently. For example, the {{StandaloneHaServices}} assume that hostname are normalized. However, this is not true in the Yarn, Mesos and Flip-6 case. For the HA mode this is not a problem since we distribute the hostnames via ZooKeeper. All the affected cases have in common that they start their {{ActorSystem}} via the {{BootstrapTools}}. Adding the normalization to {{AkkaUtils#getAkkaConfig(Configuration, Option[(String, Int)])}} should solve the problem because all remote actor systems get their hostname configuration via this method. > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Priority: Critical > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is
[GitHub] flink pull request #4811: [FLINK-7818] Synchronize MetricStore access in Tas...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4811 [FLINK-7818] Synchronize MetricStore access in TaskManagersHandler ## What is the purpose of the change Synchronize MetricStore access in TaskManagersHandler ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixTaskManagersHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4811.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4811 commit 4291b5aa1b3687ee7ba11876ca35f91e33ad3360 Author: Till RohrmannDate: 2017-10-11T17:00:16Z [FLINK-7818] Synchronize MetricStore access in TaskManagersHandler ---
[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler
[ https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201119#comment-16201119 ] ASF GitHub Bot commented on FLINK-7818: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4811 [FLINK-7818] Synchronize MetricStore access in TaskManagersHandler ## What is the purpose of the change Synchronize MetricStore access in TaskManagersHandler ## Verifying this change This change is a trivial rework / code cleanup without any test coverage. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink fixTaskManagersHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4811.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4811 commit 4291b5aa1b3687ee7ba11876ca35f91e33ad3360 Author: Till RohrmannDate: 2017-10-11T17:00:16Z [FLINK-7818] Synchronize MetricStore access in TaskManagersHandler > Synchronize MetricStore access in the TaskManagersHandler > - > > Key: FLINK-7818 > URL: https://issues.apache.org/jira/browse/FLINK-7818 > Project: Flink > Issue Type: Bug > Components: Metrics, REST >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a > single {{TaskManager}} are requested. The access is not synchronized which > can be problematic because the {{MetricStore}} is not thread safe. > I propose to add synchronization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7540: Assignee: Till Rohrmann > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Assignee: Till Rohrmann >Priority: Critical > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress(); > } else { host = host.trim().toLowerCase(); > } > ... > } > It turns the host name into lowercase. > Therefore, JobManagerRetriever certainly can not find Jobmanager's > actorSYstem. > Then I removed the call to the toLowerCase() method in the source code. > Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7818) Synchronize MetricStore access in the TaskManagersHandler
[ https://issues.apache.org/jira/browse/FLINK-7818?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=1620#comment-1620 ] Till Rohrmann commented on FLINK-7818: -- Once the {{MetricStore}} is thread safe, we can remove the synchronization. > Synchronize MetricStore access in the TaskManagersHandler > - > > Key: FLINK-7818 > URL: https://issues.apache.org/jira/browse/FLINK-7818 > Project: Flink > Issue Type: Bug > Components: Metrics, REST >Affects Versions: 1.4.0, 1.3.2 >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{TaskManagersHandler}} accesses the {{MetricStore}} when details for a > single {{TaskManager}} are requested. The access is not synchronized which > can be problematic because the {{MetricStore}} is not thread safe. > I propose to add synchronization. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann updated FLINK-7540: - Priority: Critical (was: Blocker) > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Priority: Critical > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress(); > } else { host = host.trim().toLowerCase(); > } > ... > } > It turns the host name into lowercase. > Therefore, JobManagerRetriever certainly can not find Jobmanager's > actorSYstem. > Then I removed the call to the toLowerCase() method in the source code. > Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4804: [hotfix] [kafka] Fix the config parameter names in KafkaT...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4804 thanks for catching it, merging. ---
[jira] [Commented] (FLINK-6703) Document how to take a savepoint on YARN
[ https://issues.apache.org/jira/browse/FLINK-6703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201018#comment-16201018 ] ASF GitHub Bot commented on FLINK-6703: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4721 merging. > Document how to take a savepoint on YARN > > > Key: FLINK-6703 > URL: https://issues.apache.org/jira/browse/FLINK-6703 > Project: Flink > Issue Type: Improvement > Components: Documentation, YARN >Affects Versions: 1.3.0, 1.4.0 >Reporter: Chesnay Schepler >Assignee: Bowen Li > > The documentation should have a separate entry for savepoint related CLI > commands in combination with YARN. It is currently not documented that you > have to supply the application id, nor how you can pass it. > {code} > ./bin/flink savepoint -m yarn-cluster (-yid|-yarnapplicationId) > > {code} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4721: [FLINK-6703][savepoint/doc] Document how to take a savepo...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4721 merging. ---
[jira] [Commented] (FLINK-7774) Deserializers are not cleaned up when closing input streams
[ https://issues.apache.org/jira/browse/FLINK-7774?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201015#comment-16201015 ] ASF GitHub Bot commented on FLINK-7774: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4783 merging. > Deserializers are not cleaned up when closing input streams > --- > > Key: FLINK-7774 > URL: https://issues.apache.org/jira/browse/FLINK-7774 > Project: Flink > Issue Type: Bug > Components: Network >Affects Versions: 1.4.0, 1.3.2 >Reporter: Nico Kruber >Assignee: Nico Kruber > > On cleanup of the {{AbstractRecordReader}}, {{StreamInputProcessor}}, and > {{StreamTwoInputProcessor}}, the deserializers' current buffers are cleaned > up but not their internal {{spanningWrapper}} and {{nonSpanningWrapper}} via > {{RecordDeserializer#clear}}. This call should be added. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7807) HandlerUtils methods should log errors
[ https://issues.apache.org/jira/browse/FLINK-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201007#comment-16201007 ] ASF GitHub Bot commented on FLINK-7807: --- Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4799 > HandlerUtils methods should log errors > -- > > Key: FLINK-7807 > URL: https://issues.apache.org/jira/browse/FLINK-7807 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The {{HandlerUtils}} methods for sending (error) responses send sanitized > responses in case of exceptions, but don't log them in any way making > debugging impossible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4783: [FLINK-7774][network] fix not clearing deserializers on c...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4783 merging. ---
[GitHub] flink pull request #4799: [FLINK-7807] [REST] Log exceptions in HandlerUtils...
Github user zentol closed the pull request at: https://github.com/apache/flink/pull/4799 ---
[jira] [Commented] (FLINK-7808) JobDetails constructor should check length of tasksPerState array
[ https://issues.apache.org/jira/browse/FLINK-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201002#comment-16201002 ] ASF GitHub Bot commented on FLINK-7808: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4800 > JobDetails constructor should check length of tasksPerState array > - > > Key: FLINK-7808 > URL: https://issues.apache.org/jira/browse/FLINK-7808 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > The JobDetails constructor accepts an {{int[] tasksPerState}} argument, which > represents the number of tasks in each {{ExecutionState}}. There is no check > in place to verify that the array has the correct size, which the json > serializer assumes to be the case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7661) Add credit field in PartitionRequest message
[ https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16201003#comment-16201003 ] ASF GitHub Bot commented on FLINK-7661: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4698 > Add credit field in PartitionRequest message > > > Key: FLINK-7661 > URL: https://issues.apache.org/jira/browse/FLINK-7661 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | > {{queueIndex}} | {{InputChannelID}} fields. > We will add a new {{credit}} field indicating the initial credit of > {{InputChannel}}, and this info can be got from {{InputChannel}} directly > after assigning exclusive buffers to it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7661) Add credit field in PartitionRequest message
[ https://issues.apache.org/jira/browse/FLINK-7661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7661. --- Resolution: Fixed Fix Version/s: 1.4.0 1.4: 891f359d710146acf3d05cd2af3bb430a8fbc99b > Add credit field in PartitionRequest message > > > Key: FLINK-7661 > URL: https://issues.apache.org/jira/browse/FLINK-7661 > Project: Flink > Issue Type: Sub-task > Components: Network >Affects Versions: 1.4.0 >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > Currently the {{PartitionRequest}} message contains {{ResultPartitionID}} | > {{queueIndex}} | {{InputChannelID}} fields. > We will add a new {{credit}} field indicating the initial credit of > {{InputChannel}}, and this info can be got from {{InputChannel}} directly > after assigning exclusive buffers to it. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4800: [FLINK-7808] [REST] JobDetails constructor checks ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4800 ---
[GitHub] flink pull request #4791: [hotfix] [Javadoc] Fix typos
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4791 ---
[jira] [Closed] (FLINK-7807) HandlerUtils methods should log errors
[ https://issues.apache.org/jira/browse/FLINK-7807?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7807. --- Resolution: Fixed 1.4: a9c13c7d90253cc59749c20e6ee6e7f790cb598a > HandlerUtils methods should log errors > -- > > Key: FLINK-7807 > URL: https://issues.apache.org/jira/browse/FLINK-7807 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > > The {{HandlerUtils}} methods for sending (error) responses send sanitized > responses in case of exceptions, but don't log them in any way making > debugging impossible. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Closed] (FLINK-7808) JobDetails constructor should check length of tasksPerState array
[ https://issues.apache.org/jira/browse/FLINK-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chesnay Schepler closed FLINK-7808. --- Resolution: Fixed 1.4: e2ae45b48345cf56501530e101f3c8523448ab79 > JobDetails constructor should check length of tasksPerState array > - > > Key: FLINK-7808 > URL: https://issues.apache.org/jira/browse/FLINK-7808 > Project: Flink > Issue Type: Bug > Components: REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler >Priority: Minor > Fix For: 1.4.0 > > > The JobDetails constructor accepts an {{int[] tasksPerState}} argument, which > represents the number of tasks in each {{ExecutionState}}. There is no check > in place to verify that the array has the correct size, which the json > serializer assumes to be the case. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4797: [hotfix][doc] Remove outdated best-practice sugges...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4797 ---
[GitHub] flink pull request #4698: [FLINK-7661][network] Add credit field in Partitio...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4698 ---
[jira] [Commented] (FLINK-6926) Add MD5/SHA1/SHA2 supported in SQL
[ https://issues.apache.org/jira/browse/FLINK-6926?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200980#comment-16200980 ] ASF GitHub Bot commented on FLINK-6926: --- GitHub user genged opened a pull request: https://github.com/apache/flink/pull/4810 [FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL ## What is the purpose of the change This pull request implements MD5, SHA1 and SHA256 support in Flink SQL as discussed in FLINK-6926 ## Brief change log - Added MD5, SHA1, SHA256 SQL functions - Added relevant unit tests ## Verifying this change This change added tests and can be verified as follows: - Added SQL expression tests - Added HashFunctionsTest with testAllApis - Validated both correct calculation and behavior for null input ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: don't know - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/genged/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4810 commit 670ffd0c438a6d519d580a077582121b66b425f5 Author: Michael GendelmanDate: 2017-10-08T21:00:39Z [FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL > Add MD5/SHA1/SHA2 supported in SQL > -- > > Key: FLINK-6926 > URL: https://issues.apache.org/jira/browse/FLINK-6926 > Project: Flink > Issue Type: Sub-task > Components: Table API & SQL >Affects Versions: 1.4.0 >Reporter: sunjincheng >Assignee: Shaoxuan Wang > > MD5(str)Calculates an MD5 128-bit checksum for the string. The value is > returned as a string of 32 hexadecimal digits, or NULL if the argument was > NULL. The return value can, for example, be used as a hash key. See the notes > at the beginning of this section about storing hash values efficiently. > The return value is a nonbinary string in the connection character set. > * Example: > MD5('testing') - 'ae2b1fca515949e5d54fb22b8ed95575' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha1] > SHA1(str), SHA(str)Calculates an SHA-1 160-bit checksum for the string, as > described in RFC 3174 (Secure Hash Algorithm). The value is returned as a > string of 40 hexadecimal digits, or NULL if the argument was NULL. One of the > possible uses for this function is as a hash key. See the notes at the > beginning of this section about storing hash values efficiently. You can also > use SHA1() as a cryptographic function for storing passwords. SHA() is > synonymous with SHA1(). > The return value is a nonbinary string in the connection character set. > * Example: > SHA1('abc') -> 'a9993e364706816aba3e25717850c26c9cd0d89d' > SHA2(str, hash_length)Calculates the SHA-2 family of hash functions (SHA-224, > SHA-256, SHA-384, and SHA-512). The first argument is the cleartext string to > be hashed. The second argument indicates the desired bit length of the > result, which must have a value of 224, 256, 384, 512, or 0 (which is > equivalent to 256). If either argument is NULL or the hash length is not one > of the permitted values, the return value is NULL. Otherwise, the function > result is a hash value containing the desired number of bits. See the notes > at the beginning of this section about storing hash values efficiently. > The return value is a nonbinary string in the connection character set. > * Example: > SHA2('abc', 224) -> '23097d223405d8228642a477bda255b32aadbce4bda0b3f7e36c9da7' > * See more: > ** [MySQL| > https://dev.mysql.com/doc/refman/5.7/en/encryption-functions.html#function_sha2] -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4810: [FLINK-6926] [table] Add support for MD5,SHA1 and ...
GitHub user genged opened a pull request: https://github.com/apache/flink/pull/4810 [FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL ## What is the purpose of the change This pull request implements MD5, SHA1 and SHA256 support in Flink SQL as discussed in FLINK-6926 ## Brief change log - Added MD5, SHA1, SHA256 SQL functions - Added relevant unit tests ## Verifying this change This change added tests and can be verified as follows: - Added SQL expression tests - Added HashFunctionsTest with testAllApis - Validated both correct calculation and behavior for null input ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: don't know - The runtime per-record code paths (performance sensitive): don't know - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? yes - If yes, how is the feature documented? not documented You can merge this pull request into a Git repository by running: $ git pull https://github.com/genged/flink master Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4810.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4810 commit 670ffd0c438a6d519d580a077582121b66b425f5 Author: Michael GendelmanDate: 2017-10-08T21:00:39Z [FLINK-6926] [table] Add support for MD5,SHA1 and SHA256 in SQL ---
[jira] [Commented] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200822#comment-16200822 ] Aljoscha Krettek commented on FLINK-7540: - [~oty5081] Thanks a lot for reporting this! The code that is starting the Akka actor system in the YARN case (pre FLIP-6) is here: https://github.com/apache/flink/blob/b4120c1e15be1c36d07dfb29080e29750d5a0955/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java#L309. We would probably have to use the same hostname normalisation here. [~till.rohrmann] What do you think about this? For FLIP-6 there seem to be several different entry points and it's not always clear to me how the actor system get's its name. > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Priority: Blocker > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, > Akka seems to preserve the uppercase/lowercase distinctions when starting the > Actor. This leads to problems because other parts (for example > {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional > cluster. > h1. Original Issue Text > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress();
[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7540: Description: In {{NetUtils.unresolvedHostToNormalizedString()}} we lowercase hostnames, Akka seems to preserve the uppercase/lowercase distinctions when starting the Actor. This leads to problems because other parts (for example {{JobManagerRetriever}}) cannot find the actor leading to a nonfunctional cluster. h1. Original Issue Text Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” DSJ-signal-900G-71” When using the following command: ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 ~/flink-1.3.1/examples/batch/WordCount.jar --input /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result Or ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" There will be some exceptions at Command line interface: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) … Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. h4. Then the job fails , starting the yarn-session is the same. The exceptions of the application log: 2017-08-10 17:36:10,334 WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)] … 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)]] after [1 ms] And I found some differences in actor System: 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager - Starting JobManager at akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager - JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership with leader session ID Some(----). 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend listening at 0:0:0:0:0:0:0:0:54921 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 54921 2017-08-10 17:36:00,313 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” The hostname of JobManagerRetriever’s actor is lowercase. And I read source code, Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 127: public static String unresolvedHostToNormalizedString(String host) { // Return loopback interface address if host is null // This represents the behavior of {@code InetAddress.getByName } and RFC 3330 if (host == null) { host = InetAddress.getLoopbackAddress().getHostAddress(); } else {host = host.trim().toLowerCase(); } ... } It turns the host name into lowercase. Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem. Then I removed the call to the toLowerCase() method in the source code. Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. was: h1. Original Issue Text Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” DSJ-signal-900G-71” When using the following command: ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 ~/flink-1.3.1/examples/batch/WordCount.jar --input /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result Or ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" There will be some exceptions at Command line interface: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) … Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. h4. Then the job
[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7540: Description: h1. Original Issue Text Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” DSJ-signal-900G-71” When using the following command: ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 ~/flink-1.3.1/examples/batch/WordCount.jar --input /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result Or ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" There will be some exceptions at Command line interface: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) … Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. h4. Then the job fails , starting the yarn-session is the same. The exceptions of the application log: 2017-08-10 17:36:10,334 WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for: ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)] … 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager - Resource manager could not register at JobManager akka.pattern.AskTimeoutException: Ask timed out on [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), Path(/user/jobmanager)]] after [1 ms] And I found some differences in actor System: 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager - Starting JobManager at akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager - JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted leadership with leader session ID Some(----). 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend listening at 0:0:0:0:0:0:0:0:54921 2017-08-10 17:36:00,312 INFO org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port 54921 2017-08-10 17:36:00,313 INFO org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader reachable under akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” The hostname of JobManagerRetriever’s actor is lowercase. And I read source code, Class NetUtils the unresolvedHostToNormalizedString(String host) method of line 127: public static String unresolvedHostToNormalizedString(String host) { // Return loopback interface address if host is null // This represents the behavior of {@code InetAddress.getByName } and RFC 3330 if (host == null) { host = InetAddress.getLoopbackAddress().getHostAddress(); } else {host = host.trim().toLowerCase(); } ... } It turns the host name into lowercase. Therefore, JobManagerRetriever certainly can not find Jobmanager's actorSYstem. Then I removed the call to the toLowerCase() method in the source code. Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. was: Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” DSJ-signal-900G-71” When using the following command: ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 ~/flink-1.3.1/examples/batch/WordCount.jar --input /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result Or ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" There will be some exceptions at Command line interface: java.lang.RuntimeException: Unable to get ClusterClient status from Application Client at org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) … Caused by: org.apache.flink.util.FlinkException: Could not connect to the leading JobManager. Please check that the JobManager is running. h4. Then the job fails , starting the yarn-session is the same. The exceptions of the application log: 2017-08-10 17:36:10,334 WARN org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to retrieve leader gateway and port. akka.actor.ActorNotFound: Actor not found for:
[jira] [Updated] (FLINK-7540) Akka hostnames are not normalised consistently
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7540: Summary: Akka hostnames are not normalised consistently (was: submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname) > Akka hostnames are not normalised consistently > -- > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Priority: Blocker > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress(); > } else { host = host.trim().toLowerCase(); > } > ... > } > It turns the host name into lowercase. > Therefore, JobManagerRetriever certainly can not find Jobmanager's > actorSYstem. > Then I removed the call to the toLowerCase() method in the source code. > Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
[ https://issues.apache.org/jira/browse/FLINK-7816?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200786#comment-16200786 ] Hai Zhou UTC+8 commented on FLINK-7816: --- Hi [~aljoscha], In *ClosureCleaner.clean(func, checkSerializable)* method, Should we first check the *func.getClass* is a closure? > Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner > > > Key: FLINK-7816 > URL: https://issues.apache.org/jira/browse/FLINK-7816 > Project: Flink > Issue Type: Sub-task > Components: Scala API >Reporter: Aljoscha Krettek > > We have the same problem as Spark: SPARK-14540 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7738) Create WebSocket handler (server)
[ https://issues.apache.org/jira/browse/FLINK-7738?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200785#comment-16200785 ] ASF GitHub Bot commented on FLINK-7738: --- Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 Updated the description based on the latest PR. > Create WebSocket handler (server) > - > > Key: FLINK-7738 > URL: https://issues.apache.org/jira/browse/FLINK-7738 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management, Mesos >Reporter: Eron Wright >Assignee: Eron Wright > > An abstract handler is needed to support websocket communication. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4767: [FLINK-7738] [flip-6] Create WebSocket handler (server, c...
Github user EronWright commented on the issue: https://github.com/apache/flink/pull/4767 Updated the description based on the latest PR. ---
[jira] [Updated] (FLINK-7540) submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop cluster with capitalized hostname
[ https://issues.apache.org/jira/browse/FLINK-7540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Aljoscha Krettek updated FLINK-7540: Component/s: Distributed Coordination > submit a job on yarn-cluster mode or start a yarn-session failed,in hadoop > cluster with capitalized hostname > > > Key: FLINK-7540 > URL: https://issues.apache.org/jira/browse/FLINK-7540 > Project: Flink > Issue Type: Bug > Components: Distributed Coordination, YARN >Affects Versions: 1.3.1, 1.4.0, 1.3.2 >Reporter: Tong Yan Ou >Priority: Blocker > Labels: patch > Fix For: 1.3.3 > > Original Estimate: 336h > Remaining Estimate: 336h > > Hostnames in my hadoop cluster are like these: “DSJ-RTB-4T-177”,” > DSJ-signal-900G-71” > When using the following command: > ./bin/flink run -m yarn-cluster -yn 1 -yqu xl_trip -yjm 1024 > ~/flink-1.3.1/examples/batch/WordCount.jar --input > /user/all_trip_dev/test/testcount.txt --output /user/all_trip_dev/test/result > > Or > ./bin/yarn-session.sh -d -jm 6144 -tm 12288 -qu xl_trip -s 24 -n 5 -nm > "flink-YarnSession-jm6144-tm12288-s24-n5-xl_trip" > There will be some exceptions at Command line interface: > java.lang.RuntimeException: Unable to get ClusterClient status from > Application Client > at > org.apache.flink.yarn.YarnClusterClient.getClusterStatus(YarnClusterClient.java:243) > … > Caused by: org.apache.flink.util.FlinkException: Could not connect to the > leading JobManager. Please check that the JobManager is running. > h4. Then the job fails , starting the yarn-session is the same. > The exceptions of the application log: > 2017-08-10 17:36:10,334 WARN > org.apache.flink.runtime.webmonitor.JobManagerRetriever - Failed to > retrieve leader gateway and port. > akka.actor.ActorNotFound: Actor not found for: > ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)] > … > 2017-08-10 17:36:10,837 ERROR org.apache.flink.yarn.YarnFlinkResourceManager > - Resource manager could not register at JobManager > akka.pattern.AskTimeoutException: Ask timed out on > [ActorSelection[Anchor(akka.tcp://flink@dsj-signal-4t-248:65082/), > Path(/user/jobmanager)]] after [1 ms] > And I found some differences in actor System: > 2017-08-10 17:35:56,791 INFO org.apache.flink.yarn.YarnJobManager > - Starting JobManager at > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager. > 2017-08-10 17:35:56,880 INFO org.apache.flink.yarn.YarnJobManager > - JobManager > akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager was granted > leadership with leader session ID Some(----). > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Web frontend > listening at 0:0:0:0:0:0:0:0:54921 > 2017-08-10 17:36:00,312 INFO > org.apache.flink.runtime.webmonitor.WebRuntimeMonitor - Starting with > JobManager akka.tcp://flink@DSJ-signal-4T-248:65082/user/jobmanager on port > 54921 > 2017-08-10 17:36:00,313 INFO > org.apache.flink.runtime.webmonitor.JobManagerRetriever - New leader > reachable under > akka.tcp://flink@dsj-signal-4t-248:65082/user/jobmanager:----. > The JobManager is “akka.tcp://flink@DSJ-signal-4T-248:65082” and the > JobManagerRetriever is “akka.tcp://flink@dsj-signal-4t-248:65082” > The hostname of JobManagerRetriever’s actor is lowercase. > And I read source code, > Class NetUtils the unresolvedHostToNormalizedString(String host) method of > line 127: > public static String unresolvedHostToNormalizedString(String host) { > > // Return loopback interface address if host is null > // This represents the behavior of {@code InetAddress.getByName } and RFC > 3330if (host == null) { >host = InetAddress.getLoopbackAddress().getHostAddress(); > } else { host = host.trim().toLowerCase(); > } > ... > } > It turns the host name into lowercase. > Therefore, JobManagerRetriever certainly can not find Jobmanager's > actorSYstem. > Then I removed the call to the toLowerCase() method in the source code. > Finally ,I can submit a job in yarn-cluster mode and start a yarn-session. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-6951) Incompatible versions of httpcomponents jars for Flink kinesis connector
[ https://issues.apache.org/jira/browse/FLINK-6951?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200762#comment-16200762 ] Aljoscha Krettek commented on FLINK-6951: - [~phoenixjiangnan] & [~tzulitai] Is this still an issue? > Incompatible versions of httpcomponents jars for Flink kinesis connector > > > Key: FLINK-6951 > URL: https://issues.apache.org/jira/browse/FLINK-6951 > Project: Flink > Issue Type: Bug > Components: Kinesis Connector >Affects Versions: 1.3.0 >Reporter: Ted Yu >Assignee: Bowen Li >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > In the following thread, Bowen reported incompatible versions of > httpcomponents jars for Flink kinesis connector : > http://search-hadoop.com/m/Flink/VkLeQN2m5EySpb1?subj=Re+Incompatible+Apache+Http+lib+in+Flink+kinesis+connector > We should find a solution such that users don't have to change dependency > version(s) themselves when building Flink kinesis connector. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200672#comment-16200672 ] Chesnay Schepler commented on FLINK-7608: - that's exactly what you have to implement. > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200658#comment-16200658 ] ASF GitHub Bot commented on FLINK-7791: --- Github user zentol commented on the issue: https://github.com/apache/flink/pull/4802 What do you mean when you say triggerSavepoint is not a command of the ClusterClient? It does have a `triggerSavepoint` method. > Integrate LIST command into REST client > --- > > Key: FLINK-7791 > URL: https://issues.apache.org/jira/browse/FLINK-7791 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/4802 What do you mean when you say triggerSavepoint is not a command of the ClusterClient? It does have a `triggerSavepoint` method. ---
[jira] [Commented] (FLINK-7803) Update savepoint Documentation
[ https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200655#comment-16200655 ] ASF GitHub Bot commented on FLINK-7803: --- Github user razvan100 closed the pull request at: https://github.com/apache/flink/pull/4808 > Update savepoint Documentation > -- > > Key: FLINK-7803 > URL: https://issues.apache.org/jira/browse/FLINK-7803 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Razvan >Assignee: Razvan > > Can you please update > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > to specify the savepoint location *MUST* always be a location accessible by > all hosts? > I spent quite some time believing it'S a bug and trying to find solutions, > see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the > current documentation and other might waste time also believing it's an > actual issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4808: [FLINK-7803][Documentation] Add missing informatio...
Github user razvan100 closed the pull request at: https://github.com/apache/flink/pull/4808 ---
[jira] [Commented] (FLINK-7803) Update savepoint Documentation
[ https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200652#comment-16200652 ] ASF GitHub Bot commented on FLINK-7803: --- GitHub user razvan100 opened a pull request: https://github.com/apache/flink/pull/4809 [FLINK-7803][Documentation] Add missing savepoint information This fixes FLINK-7803 by emphasizing the savepoint save location should be on a distributed file-system. You can merge this pull request into a Git repository by running: $ git pull https://github.com/razvan100/flink patch-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4809 commit a3fa67e0dbc3498a5a916dbe1f7702366d793867 Author: RazvanDate: 2017-10-11T17:38:50Z [FLINK-7803] Add missing savepoint information This fixes FLINK-7803 by emphasizing the savepoint save location should be on a distributed file-system. > Update savepoint Documentation > -- > > Key: FLINK-7803 > URL: https://issues.apache.org/jira/browse/FLINK-7803 > Project: Flink > Issue Type: Bug > Components: Documentation >Reporter: Razvan >Assignee: Razvan > > Can you please update > https://ci.apache.org/projects/flink/flink-docs-release-1.3/setup/savepoints.html > to specify the savepoint location *MUST* always be a location accessible by > all hosts? > I spent quite some time believing it'S a bug and trying to find solutions, > see https://issues.apache.org/jira/browse/FLINK-7750. It's not obvious in the > current documentation and other might waste time also believing it's an > actual issue. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4809: [FLINK-7803][Documentation] Add missing savepoint ...
GitHub user razvan100 opened a pull request: https://github.com/apache/flink/pull/4809 [FLINK-7803][Documentation] Add missing savepoint information This fixes FLINK-7803 by emphasizing the savepoint save location should be on a distributed file-system. You can merge this pull request into a Git repository by running: $ git pull https://github.com/razvan100/flink patch-3 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4809.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4809 commit a3fa67e0dbc3498a5a916dbe1f7702366d793867 Author: RazvanDate: 2017-10-11T17:38:50Z [FLINK-7803] Add missing savepoint information This fixes FLINK-7803 by emphasizing the savepoint save location should be on a distributed file-system. ---
[jira] [Commented] (FLINK-7803) Update savepoint Documentation
[ https://issues.apache.org/jira/browse/FLINK-7803?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200626#comment-16200626 ] ASF GitHub Bot commented on FLINK-7803: --- GitHub user razvan100 opened a pull request: https://github.com/apache/flink/pull/4808 [FLINK-7803][Documentation] Add missing information about savepoint location FLINK-7803 FLINK-7750 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not
[GitHub] flink pull request #4808: [FLINK-7803][Documentation] Add missing informatio...
GitHub user razvan100 opened a pull request: https://github.com/apache/flink/pull/4808 [FLINK-7803][Documentation] Add missing information about savepoint location FLINK-7803 FLINK-7750 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/razvan100/flink patch-2 Alternatively you can review and apply these changes as the patch at:
[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585 ] Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:54 PM: - Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} MetricGroup metricGroup // equal metricGroup from the constructor int windowSize // equal histogramWindowSize from the constructor latencyStats = HashMap{code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = this.latencyStats.get(key); if sourceStats == null then { sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) } this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? was (Author: yew1eb): Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} MetricGroup metricGroup // equal metricGroup from the constructor int windowSize // equal histogramWindowSize from the constructor latencyStats = HashMap {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then { sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) } this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585 ] Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:53 PM: - Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} MetricGroup metricGroup // equal metricGroup from the constructor int windowSize // equal histogramWindowSize from the constructor latencyStats = HashMap{code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then { sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) } this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? was (Author: yew1eb): Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} MetricGroup metricGroup // equal metricGroup from the constructor int windowSize // equal histogramWindowSize from the constructor latencyStats = HashMap {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585 ] Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:52 PM: - Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} metricGroup // equal metricGroup from the constructor windowSize // equal HistogramWindowSize from the constructor latencyStats = HashMap; {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? was (Author: yew1eb): Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int HistogramWindowSize) {code} 2. contains three fields {code:java} metricGroup // equal metricGroup from the constructor windowSize // equal HistogramWindowSize from the constructor latencyStats = HashMap ; {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Comment Edited] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585 ] Hai Zhou UTC+8 edited comment on FLINK-7608 at 10/11/17 4:52 PM: - Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} MetricGroup metricGroup // equal metricGroup from the constructor int windowSize // equal histogramWindowSize from the constructor latencyStats = HashMap{code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? was (Author: yew1eb): Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int histogramWindowSize) {code} 2. contains three fields {code:java} metricGroup // equal metricGroup from the constructor windowSize // equal HistogramWindowSize from the constructor latencyStats = HashMap ; {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7608) LatencyGauge change to histogram metric
[ https://issues.apache.org/jira/browse/FLINK-7608?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200585#comment-16200585 ] Hai Zhou UTC+8 commented on FLINK-7608: --- Before I start, I want discuss my idea: redesign a class that measures latency, named LatencyStatistics. the following is the structure of this class: 1.contains a constructor {code:java} LatencyStatistics(MetricGroup metricGroup, int HistogramWindowSize) {code} 2. contains three fields {code:java} metricGroup // equal metricGroup from the constructor windowSize // equal HistogramWindowSize from the constructor latencyStats = HashMap; {code} 3. contains a method to receive latencyMarker {code:java} reportLatency(latencyMarker maker) { String key = _maker.vertexID_ +"-"+_maker.subtaskIndex_ LatencyHistogram sourceStats = latencyStats.get(key); if sourceStats == null then sourceStats = new LatencyHistogram(this.windowSize) this.latencyStats.put(key, sourceStats) this.metricGroup.histogram(key, sourceStats) this.sourceStats.addValue(System.currentTimeMillis() - marker.getMarkedTime()); } {code} the *LatencyHistogram* extends org.apache.flink.metrics.Histogram, and wrap a *DescriptiveStatistics* internally. [~Zentol] [~rmetzger] [~aljoscha], what is your opinions here? > LatencyGauge change to histogram metric > > > Key: FLINK-7608 > URL: https://issues.apache.org/jira/browse/FLINK-7608 > Project: Flink > Issue Type: Bug > Components: Metrics >Reporter: Hai Zhou UTC+8 >Assignee: Hai Zhou UTC+8 >Priority: Blocker > Fix For: 1.4.0, 1.3.3 > > > I used slf4jReporter[https://issues.apache.org/jira/browse/FLINK-4831] to > export metrics the log file. > I found: > {noformat} > -- Gauges > - > .. > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Map.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=-1}={p99=116.0, > p50=59.5, min=11.0, max=116.0, p95=116.0, mean=61.836}} > zhouhai-mbp.taskmanager.f3fd3a269c8c3da4e8319c8f6a201a57.Flink Streaming > Job.Sink- Unnamed.0.latency: > value={LatencySourceDescriptor{vertexID=1, subtaskIndex=0}={p99=195.0, > p50=163.5, min=115.0, max=195.0, p95=195.0, mean=161.0}} > .. > {noformat} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (FLINK-7817) Add TaskManagerDetailsHandler
Till Rohrmann created FLINK-7817: Summary: Add TaskManagerDetailsHandler Key: FLINK-7817 URL: https://issues.apache.org/jira/browse/FLINK-7817 Project: Flink Issue Type: Sub-task Components: REST, Webfrontend Affects Versions: 1.4.0 Reporter: Till Rohrmann We should split the legacy {{TaskManagersHandler}} into a new {{TaskManagerHandler}} and a {{TaskManagerDetailsHandler}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Assigned] (FLINK-7648) Port TaskManagersHandler to new REST endpoint
[ https://issues.apache.org/jira/browse/FLINK-7648?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Till Rohrmann reassigned FLINK-7648: Assignee: Till Rohrmann > Port TaskManagersHandler to new REST endpoint > - > > Key: FLINK-7648 > URL: https://issues.apache.org/jira/browse/FLINK-7648 > Project: Flink > Issue Type: Sub-task > Components: REST, Webfrontend >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > Labels: flip-6 > Fix For: 1.4.0 > > > Port existing {{TaskManagersHandler}} to the new REST endpoint -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7810) Switch from custom Flakka to Akka 2.4.x
[ https://issues.apache.org/jira/browse/FLINK-7810?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200564#comment-16200564 ] ASF GitHub Bot commented on FLINK-7810: --- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4807 [FLINK-7810] Switch from custom Flakka to Akka 2.4.x ## What is the purpose of the change Drop support for Scala 2.10 and then update to a newer Akka version. Before, we were forced to stay on our custom Flakka 2.3.x version with back ports because newer Akka does not support Scala 2.10. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7729-drop-210-akka-update Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4807 commit 24e70936eac780792dd00d63f833b286f4aa2aaf Author: Aljoscha KrettekDate: 2017-10-06T09:42:00Z [FLINK-7809] Remove support for Scala 2.10 commit e64135f507256130b4d352e93152b736265de9c2 Author: Aljoscha Krettek Date: 2017-10-06T12:33:43Z [FLINK-7810] Switch from custom Flakka to Akka 2.4.x We can do this now that we dropped support for Scala 2.10. > Switch from custom Flakka to Akka 2.4.x > --- > > Key: FLINK-7810 > URL: https://issues.apache.org/jira/browse/FLINK-7810 > Project: Flink > Issue Type: Improvement > Components: Distributed Coordination >Reporter: Aljoscha Krettek >Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4807: [FLINK-7810] Switch from custom Flakka to Akka 2.4...
GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4807 [FLINK-7810] Switch from custom Flakka to Akka 2.4.x ## What is the purpose of the change Drop support for Scala 2.10 and then update to a newer Akka version. Before, we were forced to stay on our custom Flakka 2.3.x version with back ports because newer Akka does not support Scala 2.10. You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7729-drop-210-akka-update Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4807.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4807 commit 24e70936eac780792dd00d63f833b286f4aa2aaf Author: Aljoscha KrettekDate: 2017-10-06T09:42:00Z [FLINK-7809] Remove support for Scala 2.10 commit e64135f507256130b4d352e93152b736265de9c2 Author: Aljoscha Krettek Date: 2017-10-06T12:33:43Z [FLINK-7810] Switch from custom Flakka to Akka 2.4.x We can do this now that we dropped support for Scala 2.10. ---
[jira] [Created] (FLINK-7816) Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner
Aljoscha Krettek created FLINK-7816: --- Summary: Support Scala 2.12 closures and Java 8 lambdas in ClosureCleaner Key: FLINK-7816 URL: https://issues.apache.org/jira/browse/FLINK-7816 Project: Flink Issue Type: Sub-task Components: Scala API Reporter: Aljoscha Krettek We have the same problem as Spark: SPARK-14540 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144025139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception { client.cancelRequestFor(inputChannel.getInputChannelId()); } + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and +* {@link AddCredit} message is sent to the producer. +*/ + @Test + public void testNotifyCreditAvailable() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + + final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); + + // Enqueue the remote input channel + handler.notifyCreditAvailable(inputChannel); + + ch.runPendingTasks(); + + // Read the enqueued msg + Object msg1 = ch.readOutbound(); + + // Should notify credit + assertEquals(msg1.getClass(), AddCredit.class); + } + + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, but {@link AddCredit} +* message is not sent after the input channel is released. +*/ + @Test + public void testNotifyCreditAvailableAfterReleased() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); --- End diff -- `channel` ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144052410 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) { } } + void notifyCreditAvailable(RemoteInputChannel inputChannel) { + // Implement in CreditBasedClientHandler --- End diff -- is that a next PR? Is it related to `CreditBasedClientHandler:: notifyCreditAvailable`? ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144016699 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -104,8 +130,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { final SocketAddress remoteAddr = ctx.channel().remoteAddress(); notifyAllChannelsOfErrorAndClose(new RemoteTransportException( - "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " - + "This might indicate that the remote task manager was lost.", + "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " --- End diff -- ditto, and same applies for other changes like this? ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200523#comment-16200523 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144013921 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,18 +43,29 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ --- End diff -- Shouldn't this be a part of previous commit? > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144013921 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,18 +43,29 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ --- End diff -- Shouldn't this be a part of previous commit? ---
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144025089 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception { client.cancelRequestFor(inputChannel.getInputChannelId()); } + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and +* {@link AddCredit} message is sent to the producer. +*/ + @Test + public void testNotifyCreditAvailable() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); --- End diff -- `ch` -> `channel` ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200522#comment-16200522 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144022698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { + if (channelError.get() != null) { + return; + } + + if (channel.isWritable()) { --- End diff -- please revert if condition (smaller branch first). Same applies to an later if > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144022698 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -272,4 +316,53 @@ private void decodeBufferOrEvent(RemoteInputChannel inputChannel, NettyMessage.B bufferOrEvent.releaseBuffer(); } } + + private void writeAndFlushNextMessageIfPossible(Channel channel) { + if (channelError.get() != null) { + return; + } + + if (channel.isWritable()) { --- End diff -- please revert if condition (smaller branch first). Same applies to an later if ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200527#comment-16200527 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144052410 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java --- @@ -330,6 +330,10 @@ else if (bufferProvider.isDestroyed()) { } } + void notifyCreditAvailable(RemoteInputChannel inputChannel) { + // Implement in CreditBasedClientHandler --- End diff -- is that a next PR? Is it related to `CreditBasedClientHandler:: notifyCreditAvailable`? > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4533: [FLINK-7416][network] Implement Netty receiver out...
Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144014064 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,18 +43,29 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + /** Channels, which already requested partitions from the producers. */ --- End diff -- ditto ---
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200524#comment-16200524 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144014064 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -37,18 +43,29 @@ import java.io.IOException; import java.net.SocketAddress; +import java.util.ArrayDeque; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicReference; +/** + * Channel handler to read {@link BufferResponse} and {@link ErrorResponse} messages from the + * producer, to write and flush {@link AddCredit} message for the producer. + */ class CreditBasedClientHandler extends ChannelInboundHandlerAdapter { private static final Logger LOG = LoggerFactory.getLogger(CreditBasedClientHandler.class); + /** Channels, which already requested partitions from the producers. */ --- End diff -- ditto > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200526#comment-16200526 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144025139 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception { client.cancelRequestFor(inputChannel.getInputChannelId()); } + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and +* {@link AddCredit} message is sent to the producer. +*/ + @Test + public void testNotifyCreditAvailable() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); + + final RemoteInputChannel inputChannel = mock(RemoteInputChannel.class); + + // Enqueue the remote input channel + handler.notifyCreditAvailable(inputChannel); + + ch.runPendingTasks(); + + // Read the enqueued msg + Object msg1 = ch.readOutbound(); + + // Should notify credit + assertEquals(msg1.getClass(), AddCredit.class); + } + + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, but {@link AddCredit} +* message is not sent after the input channel is released. +*/ + @Test + public void testNotifyCreditAvailableAfterReleased() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); --- End diff -- `channel` > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200528#comment-16200528 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144025089 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandlerTest.java --- @@ -206,6 +209,53 @@ public void testCancelBeforeActive() throws Exception { client.cancelRequestFor(inputChannel.getInputChannelId()); } + /** +* Verifies that {@link RemoteInputChannel} is enqueued in the pipeline, and +* {@link AddCredit} message is sent to the producer. +*/ + @Test + public void testNotifyCreditAvailable() throws Exception { + final CreditBasedClientHandler handler = new CreditBasedClientHandler(); + final EmbeddedChannel ch = new EmbeddedChannel(handler); --- End diff -- `ch` -> `channel` > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7416) Implement Netty receiver outgoing pipeline for credit-based
[ https://issues.apache.org/jira/browse/FLINK-7416?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200525#comment-16200525 ] ASF GitHub Bot commented on FLINK-7416: --- Github user pnowojski commented on a diff in the pull request: https://github.com/apache/flink/pull/4533#discussion_r144016699 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedClientHandler.java --- @@ -104,8 +130,8 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { final SocketAddress remoteAddr = ctx.channel().remoteAddress(); notifyAllChannelsOfErrorAndClose(new RemoteTransportException( - "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " - + "This might indicate that the remote task manager was lost.", + "Connection unexpectedly closed by remote task manager '" + remoteAddr + "'. " --- End diff -- ditto, and same applies for other changes like this? > Implement Netty receiver outgoing pipeline for credit-based > --- > > Key: FLINK-7416 > URL: https://issues.apache.org/jira/browse/FLINK-7416 > Project: Flink > Issue Type: Sub-task > Components: Network >Reporter: zhijiang >Assignee: zhijiang > Fix For: 1.4.0 > > > This is a part of work for credit-based network flow control. > The related works are : > * We define a new message called {{AddCredit}} to notify the incremental > credit during data shuffle. > * Whenever an {{InputChannel}}’s unannounced credit goes up from zero, the > channel is enqueued in the pipeline. > * Whenever the channel becomes writable, it takes the next {{InputChannel}} > and sends its unannounced credit. The credit is reset to zero after each sent. > * That way, messages are sent as often as the network has capacity and > contain as much credit as available for the channel at that point in time. > Otherwise, it would only add latency to the announcements and not increase > throughput. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7653) Properly implement DispatcherGateway methods on the Dispatcher
[ https://issues.apache.org/jira/browse/FLINK-7653?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200519#comment-16200519 ] ASF GitHub Bot commented on FLINK-7653: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4793 Thanks for the review @zentol. Rebasing onto the latest master and merging after Travis gives green light. > Properly implement DispatcherGateway methods on the Dispatcher > -- > > Key: FLINK-7653 > URL: https://issues.apache.org/jira/browse/FLINK-7653 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination, REST, Webfrontend >Affects Versions: 1.4.0 >Reporter: Tzu-Li (Gordon) Tai >Assignee: Till Rohrmann > > Currently, {{DispatcherGateway}} methods such as {{listJobs}}, > {{requestStatusOverview}}, and probably other new methods that will be added > as we port more existing REST handlers to the new endpoint, have only dummy > placeholder implementations in the {{Dispatcher}} marked with TODOs. > This ticket is to track that they are all eventually properly implemented. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4793: [FLINK-7653] Properly implement Dispatcher#requestCluster...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4793 Thanks for the review @zentol. Rebasing onto the latest master and merging after Travis gives green light. ---
[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200509#comment-16200509 ] ASF GitHub Bot commented on FLINK-7791: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4802 But `triggerSavepoint` is not a command of the `ClusterClient` but of the `CliFrontend`. All other `ClusterClient` commands like `stop` and `cancel` behave differently. > Integrate LIST command into REST client > --- > > Key: FLINK-7791 > URL: https://issues.apache.org/jira/browse/FLINK-7791 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink issue #4802: [FLINK-7791] [REST][client] Integrate LIST command into R...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/4802 But `triggerSavepoint` is not a command of the `ClusterClient` but of the `CliFrontend`. All other `ClusterClient` commands like `stop` and `cancel` behave differently. ---
[jira] [Commented] (FLINK-7815) Remove grouping from MultipleJobsDetails
[ https://issues.apache.org/jira/browse/FLINK-7815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200496#comment-16200496 ] ASF GitHub Bot commented on FLINK-7815: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4806 [FLINK-7815] Remove grouping from MultipleJobsDetails ## What is the purpose of the change With this commit the MultipleJobsDetails instance only contains a list of all jobs which could be retrieved from the cluster. With this change it is the responsibility of the web ui to group the jobs into running and finished jobs. ## Brief change log - Change `MultipleJobsDetails` to contain a single list of all retrieved jobs - Adapt jobs.svc.coffee script to group list of jobs into running and finished jobs ## Verifying this change This change has been manually tested. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink refactorMultipleJobsDetails Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4806.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4806 > Remove grouping from MultipleJobsDetails > > > Key: FLINK-7815 > URL: https://issues.apache.org/jira/browse/FLINK-7815 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{MultipleJobsDetails}} object should only contain a list of retrieved > jobs instead of pre grouping the jobs into finished and running jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4806: [FLINK-7815] Remove grouping from MultipleJobsDeta...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4806 [FLINK-7815] Remove grouping from MultipleJobsDetails ## What is the purpose of the change With this commit the MultipleJobsDetails instance only contains a list of all jobs which could be retrieved from the cluster. With this change it is the responsibility of the web ui to group the jobs into running and finished jobs. ## Brief change log - Change `MultipleJobsDetails` to contain a single list of all retrieved jobs - Adapt jobs.svc.coffee script to group list of jobs into running and finished jobs ## Verifying this change This change has been manually tested. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink refactorMultipleJobsDetails Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4806.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4806 ---
[jira] [Commented] (FLINK-7806) Move CurrentJobsOverviewHandler to jobs/overview
[ https://issues.apache.org/jira/browse/FLINK-7806?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200490#comment-16200490 ] ASF GitHub Bot commented on FLINK-7806: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4805 [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under /jobs/overview ## What is the purpose of the change This PR changes the REST endpoint URL of the `CurrentJobsOverviewHandler` from `/joboverview` to `/jobs/overview`. Moreover, it renames the handler to `JobsOverviewHandler`. ## Brief change log - Rename CurrentJobsOverviewHandler to JobsOverviewHandler - Change REST handler paths - Remove joboverview/running and joboverview/completed from JobsOverviewHandler - Adapt web ui files ## Verifying this change - The change has been manually tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink moveCurrentJobsOverviewHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4805.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4805 > Move CurrentJobsOverviewHandler to jobs/overview > > > Key: FLINK-7806 > URL: https://issues.apache.org/jira/browse/FLINK-7806 > Project: Flink > Issue Type: Sub-task > Components: REST >Affects Versions: 1.4.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > Labels: flip-6 > > The {{CurrentJobsOverviewHandler}} is currently registered under > {{/joboverview}}. I think it would be more idiomatic to register it under > {{/jobs/overview}}. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4805: [FLINK-7806] [flip6] Register CurrentJobsOverviewH...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/4805 [FLINK-7806] [flip6] Register CurrentJobsOverviewHandler under /jobs/overview ## What is the purpose of the change This PR changes the REST endpoint URL of the `CurrentJobsOverviewHandler` from `/joboverview` to `/jobs/overview`. Moreover, it renames the handler to `JobsOverviewHandler`. ## Brief change log - Rename CurrentJobsOverviewHandler to JobsOverviewHandler - Change REST handler paths - Remove joboverview/running and joboverview/completed from JobsOverviewHandler - Adapt web ui files ## Verifying this change - The change has been manually tested ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink moveCurrentJobsOverviewHandler Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4805.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4805 ---
[jira] [Created] (FLINK-7815) Remove grouping from MultipleJobsDetails
Till Rohrmann created FLINK-7815: Summary: Remove grouping from MultipleJobsDetails Key: FLINK-7815 URL: https://issues.apache.org/jira/browse/FLINK-7815 Project: Flink Issue Type: Sub-task Components: REST Affects Versions: 1.4.0 Reporter: Till Rohrmann Assignee: Till Rohrmann Priority: Minor The {{MultipleJobsDetails}} object should only contain a list of retrieved jobs instead of pre grouping the jobs into finished and running jobs. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7770) Hide Queryable State behind a proxy.
[ https://issues.apache.org/jira/browse/FLINK-7770?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200486#comment-16200486 ] ASF GitHub Bot commented on FLINK-7770: --- Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4778 > Hide Queryable State behind a proxy. > > > Key: FLINK-7770 > URL: https://issues.apache.org/jira/browse/FLINK-7770 > Project: Flink > Issue Type: Improvement > Components: Queryable State >Affects Versions: 1.4.0 >Reporter: Kostas Kloudas >Assignee: Kostas Kloudas > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[GitHub] flink pull request #4778: [FLINK-7770][FLINK-7769][Queryable State] Refactor...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/4778 ---
[jira] [Commented] (FLINK-7731) Trigger on GlobalWindow does not clean state completely
[ https://issues.apache.org/jira/browse/FLINK-7731?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200409#comment-16200409 ] Aljoscha Krettek commented on FLINK-7731: - Ah yes, sorry for not noticing that earlier. The combination of global windows (which don't have a good cleanup time), trigger firings and empty windows is indeed complex. There is also FLINK-5363 which aims at chaining the semantics a bit but it is highly likely that this introduces yet more different problems. I think for now the behaviour is OK but we might need to improve documentation (in Javadocs) a bit. What do you think? > Trigger on GlobalWindow does not clean state completely > --- > > Key: FLINK-7731 > URL: https://issues.apache.org/jira/browse/FLINK-7731 > Project: Flink > Issue Type: Bug > Components: Core, DataStream API >Affects Versions: 1.3.2 >Reporter: Gerard Garcia >Priority: Minor > > I have an operator that consists of: > CoGroup Datastream -> GlobalWindow -> CustomTrigger -> Apply function > The custom trigger fires and purges the elements after it has received the > expected number of elements (or when a timeout fires) from one of the streams > and the apply function merges the elements with the ones received from the > other stream. It appears that the state of the operator grows continuously so > it seems it never gets completely cleaned. > There is a discussion in > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Clean-GlobalWidnow-state-td15613.html > that suggests that it may be a bug. > This job reproduces the issue: > https://github.com/GerardGarcia/flink-global-window-growing-state -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (FLINK-7791) Integrate LIST command into REST client
[ https://issues.apache.org/jira/browse/FLINK-7791?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16200358#comment-16200358 ] ASF GitHub Bot commented on FLINK-7791: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/4802#discussion_r144027945 --- Diff: flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java --- @@ -420,89 +420,72 @@ protected int list(String[] args) { } try { - ActorGateway jobManagerGateway = getJobManagerGateway(options); - - LOG.info("Connecting to JobManager to retrieve list of jobs"); - Future response = jobManagerGateway.ask( - JobManagerMessages.getRequestRunningJobsStatus(), - clientTimeout); + CustomCommandLine activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); + ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - Object result; + Collection jobDetails; try { - result = Await.result(response, clientTimeout); - } - catch (Exception e) { - throw new Exception("Could not retrieve running jobs from the JobManager.", e); + CompletableFuturejobDetailsFuture = client.listJobs(); --- End diff -- I can change it to that. > Integrate LIST command into REST client > --- > > Key: FLINK-7791 > URL: https://issues.apache.org/jira/browse/FLINK-7791 > Project: Flink > Issue Type: Sub-task > Components: Client, REST >Affects Versions: 1.4.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: 1.4.0 > > -- This message was sent by Atlassian JIRA (v6.4.14#64029)