[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464917#comment-15464917 ] ASF GitHub Bot commented on FLINK-4458: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 I've re-introduced the `ForkableFlinkMiniCluster`. However, this time it only starts the normal distributed component classes (e.g. `JobManager` instead of `TestingJobManager`, etc.). This implies that the testing messages are no longer supported when using the `ForkableFlinkMiniCluster`. I think the user should not meddle with the testing components in the first place. Therefore, it seems to be fair to offer the user a mini cluster to run his tests but not one which instantiates testing components. The recommended cluster is the `LocalFlinkMiniCluster` from now on. However, in order to not break all existing test code, we still maintain the `ForkableFlinkMiniCluster`. Not having to start the testing classes allows to move them back into the test scope of `flink-runtime`, since they are no longer required by the `flink-test-utils` module. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 I've re-introduced the `ForkableFlinkMiniCluster`. However, this time it only starts the normal distributed component classes (e.g. `JobManager` instead of `TestingJobManager`, etc.). This implies that the testing messages are no longer supported when using the `ForkableFlinkMiniCluster`. I think the user should not meddle with the testing components in the first place. Therefore, it seems to be fair to offer the user a mini cluster to run his tests but not one which instantiates testing components. The recommended cluster is the `LocalFlinkMiniCluster` from now on. However, in order to not break all existing test code, we still maintain the `ForkableFlinkMiniCluster`. Not having to start the testing classes allows to move them back into the test scope of `flink-runtime`, since they are no longer required by the `flink-test-utils` module. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464459#comment-15464459 ] ASF GitHub Bot commented on FLINK-4505: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2461 Hi @wangzhijiang999, maybe we should start simple without introducing a factory method, because there might actually be not many cases to distinguish. Maybe we could rename the `TaskManagerFactory` into `TaskManagerRunner` which has static methods to create the `TaskManagers` components and does the network selection. That way we keep the initialization and the actual `TaskManager` logic separated. For testing purposes I guess we don't need to setup any components because they are usually mocked or one is using testing components. Passing these components to the constructor of the `TaskManager` should not be a big deal. Does this make sense? > Implement TaskManagerFactory to bring up TaskManager for different modes > > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4396) GraphiteReporter class not found at startup of jobmanager
[ https://issues.apache.org/jira/browse/FLINK-4396?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4396: -- Fix Version/s: (was: 1.1.2) 1.1.3 > GraphiteReporter class not found at startup of jobmanager > - > > Key: FLINK-4396 > URL: https://issues.apache.org/jira/browse/FLINK-4396 > Project: Flink > Issue Type: Improvement > Components: Build System, Metrics >Affects Versions: 1.1.1 > Environment: Windows and Unix >Reporter: RWenden > Fix For: 1.1.3 > > Original Estimate: 4h > Remaining Estimate: 4h > > For Flink 1.1.1 we configured Graphite metrics settings on the > flink-conf.yaml (for job manager (and taskmanager)). > We see the following error in the log: > 2016-08-15 14:20:34,167 ERROR org.apache.flink.runtime.metrics.MetricRegistry > - Could not instantiate metrics reportermy_reporter. Metrics > might not be exposed/reported. > java.lang.ClassNotFoundException: > org.apache.flink.metrics.graphite.GraphiteReporter > at java.net.URLClassLoader.findClass(URLClassLoader.java:381) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:331) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > at java.lang.Class.forName0(Native Method) > at java.lang.Class.forName(Class.java:264) > at > org.apache.flink.runtime.metrics.MetricRegistry.(MetricRegistry.java:119) > We found out that this class is not packaged inside flink-dist_2.11-1.1.1.jar. > Long story short: we had to install/provide the following jars into the lib > folder to make Graphite metrics to work: > flink-metrics-graphite-1.1.1.jar > flink-metrics-dropwizard-1.1.1.jar > metrics-graphite-3.1.0.jar (from dropwizard) > We think these libraries (and the ones for Ganglia,StatsD,...) should be > included in flink-dist_2.11-1.1.1.jar, for these are needed at manager > startup time. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4329) Fix Streaming File Source Timestamps/Watermarks Handling
[ https://issues.apache.org/jira/browse/FLINK-4329?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4329: -- Fix Version/s: (was: 1.1.2) 1.1.3 > Fix Streaming File Source Timestamps/Watermarks Handling > > > Key: FLINK-4329 > URL: https://issues.apache.org/jira/browse/FLINK-4329 > Project: Flink > Issue Type: Bug > Components: Streaming Connectors >Affects Versions: 1.1.0 >Reporter: Aljoscha Krettek >Assignee: Kostas Kloudas > Fix For: 1.2.0, 1.1.3 > > > The {{ContinuousFileReaderOperator}} does not correctly deal with watermarks, > i.e. they are just passed through. This means that when the > {{ContinuousFileMonitoringFunction}} closes and emits a {{Long.MAX_VALUE}} > that watermark can "overtake" the records that are to be emitted in the > {{ContinuousFileReaderOperator}}. Together with the new "allowed lateness" > setting in window operator this can lead to elements being dropped as late. > Also, {{ContinuousFileReaderOperator}} does not correctly assign ingestion > timestamps since it is not technically a source but looks like one to the > user. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-4513) Kafka connector documentation refers to Flink 1.1-SNAPSHOT
[ https://issues.apache.org/jira/browse/FLINK-4513?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger updated FLINK-4513: -- Fix Version/s: (was: 1.1.2) 1.1.3 > Kafka connector documentation refers to Flink 1.1-SNAPSHOT > -- > > Key: FLINK-4513 > URL: https://issues.apache.org/jira/browse/FLINK-4513 > Project: Flink > Issue Type: Bug > Components: Documentation >Affects Versions: 1.1.1 >Reporter: Fabian Hueske >Priority: Trivial > Fix For: 1.1.3 > > > The Kafka connector documentation: > https://ci.apache.org/projects/flink/flink-docs-release-1.1/apis/streaming/connectors/kafka.html > of Flink 1.1 refers to a Flink 1.1-SNAPSHOT Maven version. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2461 I think you should have at least another method `startComponents` which starts the different components. Everything else can be added later when we see that it would be helpful. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464771#comment-15464771 ] ASF GitHub Bot commented on FLINK-4546: --- Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77507678 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala --- @@ -18,22 +18,11 @@ package org.apache.flink.api.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String]) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { --- End diff -- Because this override method is almost the same with super method. I remove it to keep consistent with `DataSetTable` which do not override the `getRowType` method. > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...
Github user wuchong commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77507678 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala --- @@ -18,22 +18,11 @@ package org.apache.flink.api.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String]) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { --- End diff -- Because this override method is almost the same with super method. I remove it to keep consistent with `DataSetTable` which do not override the `getRowType` method. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464772#comment-15464772 ] ASF GitHub Bot commented on FLINK-4505: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2461 I think you should have at least another method `startComponents` which starts the different components. Everything else can be added later when we see that it would be helpful. > Implement TaskManagerFactory to bring up TaskManager for different modes > > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 Thank you. Testing again and merging if nothing fails. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464874#comment-15464874 ] ASF GitHub Bot commented on FLINK-3929: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 Thank you. Testing again and merging if nothing fails. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2461: [FLINK-4505][Cluster Management] Implement TaskManagerFac...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2461 Hi @wangzhijiang999, maybe we should start simple without introducing a factory method, because there might actually be not many cases to distinguish. Maybe we could rename the `TaskManagerFactory` into `TaskManagerRunner` which has static methods to create the `TaskManagers` components and does the network selection. That way we keep the initialization and the actual `TaskManager` logic separated. For testing purposes I guess we don't need to setup any components because they are usually mocked or one is using testing components. Passing these components to the constructor of the `TaskManager` should not be a big deal. Does this make sense? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464553#comment-15464553 ] ASF GitHub Bot commented on FLINK-4458: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2450 Quick question: The `ForkableFlinkMiniCluster` was something semi-public, in the sense that it was part of the `flink-test-utils` project and used by quite a few users. So this would be a breaking change. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2452: [Flink-4450] add a new module "flink-apache-storm" to sup...
Github user liuyuzhong commented on the issue: https://github.com/apache/flink/pull/2452 build success ``` [INFO] BUILD SUCCESS [INFO] [INFO] Total time: 01:28 h [INFO] Finished at: 2016-09-05T04:07:42+00:00 [INFO] Final Memory: 195M/581M ``` but Finished: UNSTABLE ``` [JENKINS] Archiving /home/jenkins/jenkins-slave/workspace/flink-github-ci/flink-contrib/flink-streaming-contrib/target/flink-streaming-contrib_2.10-1.2-SNAPSHOT-javadoc.jar to org.apache.flink/flink-streaming-contrib_2.10/1.2-SNAPSHOT/flink-streaming-contrib_2.10-1.2-SNAPSHOT-javadoc.jar channel stopped Putting comment on the pull request Finished: UNSTABLE ``` --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464700#comment-15464700 ] ASF GitHub Bot commented on FLINK-4458: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 Yes that is true. However, the `flink-test-utils` have not been guaranteed to be stable if I'm not mistaken. The changes one has to apply is to replace the `ForkableFlinkMiniCluster` with the `LocalFlinkMiniCluster`. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 Yes that is true. However, the `flink-test-utils` have not been guaranteed to be stable if I'm not mistaken. The changes one has to apply is to replace the `ForkableFlinkMiniCluster` with the `LocalFlinkMiniCluster`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464801#comment-15464801 ] ASF GitHub Bot commented on FLINK-4456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508917 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +public interface TaskExecutionStateListener { + + /** +* Called whenever the task's execution state changes +* +* @param taskExecutionState describing the task execution state change +*/ + void notifyTaskExecutionState(TaskExecutionState taskExecutionState); --- End diff -- Yes, can rename. > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java --- @@ -35,4 +35,16 @@ * task shall not consume any further input splits. */ InputSplit getNextInputSplit(); + + /** +* Starts the input split provider with a user code class loader. +* +* @param userCodeClassLoader User code class loader to use by the input split provider +*/ + void start(ClassLoader userCodeClassLoader); --- End diff -- The `start` method's intention is to pass in the `userCodeClassLoader` which is currently created in `Task#run` method. There are two other ways to solve the problem. Either creating the user code class loader outside of `Task` where the `InputSplitProvider` is created or to pass the user code class loader via the `getNextInputSplit` method call to the input split provider. For the first approach: Creating the user code class loader is a blocking operation, so this would have to executed in a future and upon completion we could create the `Task` instance in the `TaskManager`. For the second approach: We would have to touch more code but I think everywhere were the `getNextInputSplit` method is called, we have access to the user code class loader. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464799#comment-15464799 ] ASF GitHub Bot commented on FLINK-4456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508870 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java --- @@ -35,4 +35,16 @@ * task shall not consume any further input splits. */ InputSplit getNextInputSplit(); + + /** +* Starts the input split provider with a user code class loader. +* +* @param userCodeClassLoader User code class loader to use by the input split provider +*/ + void start(ClassLoader userCodeClassLoader); --- End diff -- The `start` method's intention is to pass in the `userCodeClassLoader` which is currently created in `Task#run` method. There are two other ways to solve the problem. Either creating the user code class loader outside of `Task` where the `InputSplitProvider` is created or to pass the user code class loader via the `getNextInputSplit` method call to the input split provider. For the first approach: Creating the user code class loader is a blocking operation, so this would have to executed in a future and upon completion we could create the `Task` instance in the `TaskManager`. For the second approach: We would have to touch more code but I think everywhere were the `getNextInputSplit` method is called, we have access to the user code class loader. > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508917 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +public interface TaskExecutionStateListener { + + /** +* Called whenever the task's execution state changes +* +* @param taskExecutionState describing the task execution state change +*/ + void notifyTaskExecutionState(TaskExecutionState taskExecutionState); --- End diff -- Yes, can rename. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL
Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2454 Hi @wuchong, thanks for the PR. Just had two minor comments. Thanks, Fabian --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464574#comment-15464574 ] ASF GitHub Bot commented on FLINK-4546: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77494279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala --- @@ -18,22 +18,11 @@ package org.apache.flink.api.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String]) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { --- End diff -- Why did you remove this method? > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464573#comment-15464573 ] ASF GitHub Bot commented on FLINK-4546: --- Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77494182 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala --- @@ -112,7 +112,7 @@ object FlinkRuleSets { */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( - RemoveDeltaRule.INSTANCE, --- End diff -- The `RemoveDeltaRule` class can be removed as well. > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4458) Remove ForkableFlinkMiniCluster
[ https://issues.apache.org/jira/browse/FLINK-4458?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464765#comment-15464765 ] ASF GitHub Bot commented on FLINK-4458: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 I will re-introduce the `ForkableFlinkMiniCluster` which is simply extending the `LocalFlinkMiniCluster`. > Remove ForkableFlinkMiniCluster > --- > > Key: FLINK-4458 > URL: https://issues.apache.org/jira/browse/FLINK-4458 > Project: Flink > Issue Type: Improvement > Components: Tests >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > After addressing FLINK-4424 we should be able to get rid of the > {{ForkableFlinkMiniCluster}} since we no longer have to pre-determine a port > in Flink. Thus, by setting the ports to {{0}} and letting the OS choose a > free port, there should no longer be conflicting port requests. Consequently, > the {{ForkableFlinkMiniCluster}} will become obsolete. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2450 I will re-introduce the `ForkableFlinkMiniCluster` which is simply extending the `LocalFlinkMiniCluster`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Created] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway
Till Rohrmann created FLINK-4580: Summary: Check that the RpcEndpoint supports the specified RpcGateway Key: FLINK-4580 URL: https://issues.apache.org/jira/browse/FLINK-4580 Project: Flink Issue Type: Sub-task Components: Distributed Coordination Affects Versions: 1.2.0 Reporter: Till Rohrmann Priority: Minor When calling {{RpcService.connect}} the user specifies the type of the {{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} actually supports the specified {{RpcGateway}}. I think it would be good to add a runtime check that the corresponding {{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can let the connect method fail fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77492066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +public interface TaskExecutionStateListener { + + /** +* Called whenever the task's execution state changes +* +* @param taskExecutionState describing the task execution state change +*/ + void notifyTaskExecutionState(TaskExecutionState taskExecutionState); --- End diff -- `notifyTaskExecutionStateChanged(...)`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77492004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java --- @@ -35,4 +35,16 @@ * task shall not consume any further input splits. */ InputSplit getNextInputSplit(); + + /** +* Starts the input split provider with a user code class loader. +* +* @param userCodeClassLoader User code class loader to use by the input split provider +*/ + void start(ClassLoader userCodeClassLoader); --- End diff -- Is it possible to have the `start()` and `stop()` methods not in the base interface, but only in the Akka specific implementation? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464529#comment-15464529 ] ASF GitHub Bot commented on FLINK-4456: --- Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2456 I think this is good. Few comments inline on the names, the hardest problem usually ;-) > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464544#comment-15464544 ] ASF GitHub Bot commented on FLINK-4456: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77492066 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskExecutionStateListener.java --- @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +public interface TaskExecutionStateListener { + + /** +* Called whenever the task's execution state changes +* +* @param taskExecutionState describing the task execution state change +*/ + void notifyTaskExecutionState(TaskExecutionState taskExecutionState); --- End diff -- `notifyTaskExecutionStateChanged(...)`? > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2471: Broken links on website
Github user zentol commented on the issue: https://github.com/apache/flink/pull/2471 Thank your for reporting this issue. The Apache Flink project uses JIRA to track issues, not GitHub pull requests, so it would be nice if you could close this PR and file a ticket on [JIRA](https://issues.apache.org/jira/browse/FLINK) instead. You would help us a lot if you coudl mention where you found these links ;) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77494279 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/schema/DataStreamTable.scala --- @@ -18,22 +18,11 @@ package org.apache.flink.api.table.plan.schema -import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory} -import org.apache.flink.api.table.FlinkTypeFactory import org.apache.flink.streaming.api.datastream.DataStream class DataStreamTable[T]( val dataStream: DataStream[T], override val fieldIndexes: Array[Int], override val fieldNames: Array[String]) extends FlinkTable[T](dataStream.getType, fieldIndexes, fieldNames) { - - override def getRowType(typeFactory: RelDataTypeFactory): RelDataType = { --- End diff -- Why did you remove this method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464788#comment-15464788 ] ASF GitHub Bot commented on FLINK-4456: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.List; + +/** + * Notifier for checkpoint acknowledge and decline messages in the {@link Task}. + */ +public interface CheckpointNotifier { --- End diff -- Oh I've overlooked this class. Will rename it into `CheckpointResponder`. > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Comment Edited] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464487#comment-15464487 ] Lorenz Bühmann edited comment on FLINK-2662 at 9/5/16 8:55 AM: --- [~fhueske] I attached a file. It (hopefully) contains a minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. Flink version used was 1.1.0 via Maven was (Author: lorenzb): [~fhueske] I attached a file. It contains (hopefully) a minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464542#comment-15464542 ] ASF GitHub Bot commented on FLINK-4456: --- Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77492004 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/InputSplitProvider.java --- @@ -35,4 +35,16 @@ * task shall not consume any further input splits. */ InputSplit getNextInputSplit(); + + /** +* Starts the input split provider with a user code class loader. +* +* @param userCodeClassLoader User code class loader to use by the input split provider +*/ + void start(ClassLoader userCodeClassLoader); --- End diff -- Is it possible to have the `start()` and `stop()` methods not in the base interface, but only in the Akka specific implementation? > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2473: [FLINK-4580] [rpc] Verify that the rpc endpoint su...
GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2473 [FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at connect time When calling RpcService.connect it is checked that the rpc endpoint supports the specified rpc gateway. If not, then a RpcConnectionException is thrown. The verification is implemented as an additional message following after the Identify message. The reason for this is that the ActorSystem won't wait for the Identify message to time out after it has determined that the specified actor does not exist. For user-level messages this seems to be not the case and, thus, we would have to wait for the timeout. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addGatewayEndpointCheck Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2473 commit 189fb14ddf30726f537f681e800c111ba9bc7c81 Author: Till RohrmannDate: 2016-09-05T10:13:29Z [FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at connect time When calling RpcService.connect it is checked that the rpc endpoint supports the specified rpc gateway. If not, then a RpcConnectionException is thrown. The verification is implemented as an additional message following after the Identify message. The reason for this is that the ActorSystem won't wait for the Identify message to time out after it has determined that the specified actor does not exist. For user-level messages this seems to be not the case and, thus, we would have to wait for the timeout. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2452: [Flink-4450] add a new module "flink-apache-storm"...
Github user liuyuzhong closed the pull request at: https://github.com/apache/flink/pull/2452 --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4580) Check that the RpcEndpoint supports the specified RpcGateway
[ https://issues.apache.org/jira/browse/FLINK-4580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464682#comment-15464682 ] ASF GitHub Bot commented on FLINK-4580: --- GitHub user tillrohrmann opened a pull request: https://github.com/apache/flink/pull/2473 [FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at connect time When calling RpcService.connect it is checked that the rpc endpoint supports the specified rpc gateway. If not, then a RpcConnectionException is thrown. The verification is implemented as an additional message following after the Identify message. The reason for this is that the ActorSystem won't wait for the Identify message to time out after it has determined that the specified actor does not exist. For user-level messages this seems to be not the case and, thus, we would have to wait for the timeout. You can merge this pull request into a Git repository by running: $ git pull https://github.com/tillrohrmann/flink addGatewayEndpointCheck Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/2473.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #2473 commit 189fb14ddf30726f537f681e800c111ba9bc7c81 Author: Till RohrmannDate: 2016-09-05T10:13:29Z [FLINK-4580] [rpc] Verify that the rpc endpoint supports the rpc gateway at connect time When calling RpcService.connect it is checked that the rpc endpoint supports the specified rpc gateway. If not, then a RpcConnectionException is thrown. The verification is implemented as an additional message following after the Identify message. The reason for this is that the ActorSystem won't wait for the Identify message to time out after it has determined that the specified actor does not exist. For user-level messages this seems to be not the case and, thus, we would have to wait for the timeout. > Check that the RpcEndpoint supports the specified RpcGateway > > > Key: FLINK-4580 > URL: https://issues.apache.org/jira/browse/FLINK-4580 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Affects Versions: 1.2.0 >Reporter: Till Rohrmann >Assignee: Till Rohrmann >Priority: Minor > > When calling {{RpcService.connect}} the user specifies the type of the > {{RpcGateway}}. At the moment, it is not checked whether the {{RpcEndpoint}} > actually supports the specified {{RpcGateway}}. > I think it would be good to add a runtime check that the corresponding > {{RpcEndpoint}} supports the specified {{RpcGateway}}. If not, then we can > let the connect method fail fast. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4505) Implement TaskManagerFactory to bring up TaskManager for different modes
[ https://issues.apache.org/jira/browse/FLINK-4505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464709#comment-15464709 ] ASF GitHub Bot commented on FLINK-4505: --- Github user wangzhijiang999 commented on the issue: https://github.com/apache/flink/pull/2461 Yes, that also make sense. For testing purpose it is very clear, so we do not need do anything currently, all the components in `TaskManager` constructor can be mocked implicitly. For `TaskManagerRunner`, the purpose is to pull out the initialization of related components from `TaskManager` to make it logic clear. Just one issue to be confirmed, we should provide more static methods of different parameter units for outside world or just one static method such as 'selectNetworkInterfaceAndRunTaskManager(`Configuration` configuration,`ResourceID` resourceID) '? I think providing more methods with different parameters may be reasonable, because some parameters such as 'hostname','port', `RpcService`, `HighAvailabilityServices` may want to be passed from outside. > Implement TaskManagerFactory to bring up TaskManager for different modes > > > Key: FLINK-4505 > URL: https://issues.apache.org/jira/browse/FLINK-4505 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Zhijiang Wang >Assignee: Zhijiang Wang >Priority: Minor > > Implement {{TaskExecutorFactory}} that should be an abstract class with the > helper methods to bring up the {{TaskManager}}. The factory can be > implemented by some classes to start a {{TaskManager}} in different modes > (testing, standalone, yarn). -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464832#comment-15464832 ] Jark Wu commented on FLINK-4579: +1 for adding rocksdb to the standard distribution lib. So we only need to change the flink-dist pom setting, right ? > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4456) Replace ActorGateway in Task by interface
[ https://issues.apache.org/jira/browse/FLINK-4456?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464910#comment-15464910 ] ASF GitHub Bot commented on FLINK-4456: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2456 I've addressed the comments. In order to solve the problem with the user code class loader, I decided to pass it into the `InputSplitProvider` via the `getNextInputSplit` method. That way, we don't have to know the user code class loader when creating the `InputSplitProvider` implementation. > Replace ActorGateway in Task by interface > - > > Key: FLINK-4456 > URL: https://issues.apache.org/jira/browse/FLINK-4456 > Project: Flink > Issue Type: Improvement > Components: TaskManager >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{Task}} communicates with the outside world ({{JobManager}} and > {{TaskManager}}) via {{ActorGateways}}. This bakes in the dependency on > actors. > In terms of modularization and an improved abstraction (especially wrt > Flip-6) I propose to replace the {{ActorGateways}} by interfaces which > exposes the required methods. The current implementation would then simply > wrap the method calls in messages and send them via the {{ActorGateway}} to > the recipient. > In Flip-6 the {{JobMaster}} could simply implement these interfaces as part > of their RPC contract. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2456 I've addressed the comments. In order to solve the problem with the user code class loader, I decided to pass it into the `InputSplitProvider` via the `getNextInputSplit` method. That way, we don't have to know the user code class loader when creating the `InputSplitProvider` implementation. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464508#comment-15464508 ] Fabian Hueske commented on FLINK-2662: -- Thanks a lot [~LorenzB]! This will be very helpful. I'll try to look into this issue soon but will be quite busy the next couple of days. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user StephanEwen commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77491885 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.List; + +/** + * Notifier for checkpoint acknowledge and decline messages in the {@link Task}. + */ +public interface CheckpointNotifier { --- End diff -- `CheckpointNotifier` is already used for user functions, as an interface to get notifications on completed checkpoints. How about calling this `CheckpointResponder` or something like that instead? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2450: [FLINK-4458] Replace ForkableFlinkMiniCluster by LocalFli...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2450 Quick question: The `ForkableFlinkMiniCluster` was something semi-public, in the sense that it was part of the `flink-test-utils` project and used by quite a few users. So this would be a breaking change. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2456: [FLINK-4456] Replace ActorGateway in Task and RuntimeEnvi...
Github user StephanEwen commented on the issue: https://github.com/apache/flink/pull/2456 I think this is good. Few comments inline on the names, the hardest problem usually ;-) --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4451) Throw exception when remote connection cannot be resolved
[ https://issues.apache.org/jira/browse/FLINK-4451?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464545#comment-15464545 ] Till Rohrmann commented on FLINK-4451: -- Fixed via 0cf2a822b6872ee4f3f0c99f0fcee71affaeaee5 > Throw exception when remote connection cannot be resolved > - > > Key: FLINK-4451 > URL: https://issues.apache.org/jira/browse/FLINK-4451 > Project: Flink > Issue Type: Sub-task > Components: Distributed Coordination >Reporter: Till Rohrmann >Assignee: Till Rohrmann > > The {{RpcService}} implementation should throw an exception (returned in the > future) if {{RpcService.connect(address, type)}} cannot connect to the remote > {{RpcEndpoint}}. > At the moment the {{AkkaRpcService}} does not check that the > {{IdentifyActor}} message contains a valid {{ActorRef}} and throws due to > that a {{NullPointerException}}. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2454: [FLINK-4546] [table] Remove STREAM keyword in Stre...
Github user fhueske commented on a diff in the pull request: https://github.com/apache/flink/pull/2454#discussion_r77494182 --- Diff: flink-libraries/flink-table/src/main/scala/org/apache/flink/api/table/plan/rules/FlinkRuleSets.scala --- @@ -112,7 +112,7 @@ object FlinkRuleSets { */ val DATASTREAM_OPT_RULES: RuleSet = RuleSets.ofList( - RemoveDeltaRule.INSTANCE, --- End diff -- The `RemoveDeltaRule` class can be removed as well. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2454: [FLINK-4546] [table] Remove STREAM keyword in Stream SQL
Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2454 Hi @fhueske , thanks for reviewing, I updated this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2456: [FLINK-4456] Replace ActorGateway in Task and Runt...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2456#discussion_r77508238 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/CheckpointNotifier.java --- @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + +import java.util.List; + +/** + * Notifier for checkpoint acknowledge and decline messages in the {@link Task}. + */ +public interface CheckpointNotifier { --- End diff -- Oh I've overlooked this class. Will rename it into `CheckpointResponder`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Comment Edited] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464487#comment-15464487 ] Lorenz Bühmann edited comment on FLINK-2662 at 9/5/16 8:53 AM: --- [~fhueske] I attached a file. It contains (hopefully) a minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. was (Author: lorenzb): A minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464577#comment-15464577 ] ASF GitHub Bot commented on FLINK-4546: --- Github user fhueske commented on the issue: https://github.com/apache/flink/pull/2454 Hi @wuchong, thanks for the PR. Just had two minor comments. Thanks, Fabian > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Updated] (FLINK-2662) CompilerException: "Bug: Plan generation for Unions picked a ship strategy between binary plan operators."
[ https://issues.apache.org/jira/browse/FLINK-2662?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lorenz Bühmann updated FLINK-2662: -- Attachment: FlinkBug.scala A minimal example that leads to the reported exception. While keeping it as minimal as possible, the whole logic behind the program got lost - so please don't think about it's meaning. > CompilerException: "Bug: Plan generation for Unions picked a ship strategy > between binary plan operators." > -- > > Key: FLINK-2662 > URL: https://issues.apache.org/jira/browse/FLINK-2662 > Project: Flink > Issue Type: Bug > Components: Optimizer >Affects Versions: 0.9.1, 0.10.0 >Reporter: Gabor Gevay >Priority: Critical > Fix For: 1.0.0 > > Attachments: FlinkBug.scala > > > I have a Flink program which throws the exception in the jira title. Full > text: > Exception in thread "main" org.apache.flink.optimizer.CompilerException: Bug: > Plan generation for Unions picked a ship strategy between binary plan > operators. > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.collect(BinaryUnionReplacer.java:113) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:72) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.postVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:170) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:163) > at > org.apache.flink.optimizer.plan.WorksetIterationPlanNode.acceptForStepFunction(WorksetIterationPlanNode.java:194) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:49) > at > org.apache.flink.optimizer.traversals.BinaryUnionReplacer.preVisit(BinaryUnionReplacer.java:41) > at > org.apache.flink.optimizer.plan.DualInputPlanNode.accept(DualInputPlanNode.java:162) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.SingleInputPlanNode.accept(SingleInputPlanNode.java:199) > at > org.apache.flink.optimizer.plan.OptimizedPlan.accept(OptimizedPlan.java:127) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:520) > at org.apache.flink.optimizer.Optimizer.compile(Optimizer.java:402) > at > org.apache.flink.client.LocalExecutor.getOptimizerPlanAsJSON(LocalExecutor.java:202) > at > org.apache.flink.api.java.LocalEnvironment.getExecutionPlan(LocalEnvironment.java:63) > at malom.Solver.main(Solver.java:66) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:497) > at com.intellij.rt.execution.application.AppMain.main(AppMain.java:140) > The execution plan: > http://compalg.inf.elte.hu/~ggevay/flink/plan_3_4_0_0_without_verif.txt > (I obtained this by commenting out the line that throws the exception) > The code is here: > https://github.com/ggevay/flink/tree/plan-generation-bug > The class to run is "Solver". It needs a command line argument, which is a > directory where it would write output. (On first run, it generates some > lookuptables for a few minutes, which are then placed to /tmp/movegen) -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4546) Remove STREAM keyword in Stream SQL
[ https://issues.apache.org/jira/browse/FLINK-4546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15464784#comment-15464784 ] ASF GitHub Bot commented on FLINK-4546: --- Github user wuchong commented on the issue: https://github.com/apache/flink/pull/2454 Hi @fhueske , thanks for reviewing, I updated this PR. > Remove STREAM keyword in Stream SQL > > > Key: FLINK-4546 > URL: https://issues.apache.org/jira/browse/FLINK-4546 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL >Reporter: Jark Wu >Assignee: Jark Wu > > It is about to unify Batch SQL and Stream SQL grammar, esp. removing STREAM > keyword in Stream SQL. > detailed discuss mailing list: > http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/DISCUSS-Some-thoughts-about-unify-Stream-SQL-and-Batch-SQL-grammer-td13060.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465479#comment-15465479 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547550 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A client class that serves to create connection and send data to HBase. + */ +class HBaseClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class); + + private Connection connection; + private Table table; + + public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, String tableName) throws IOException { + connection = ConnectionFactory.createConnection(hbConfig); + table = connection.getTable(TableName.valueOf(tableName)); --- End diff -- Not able to see the connect method. May be it is not added? > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465005#comment-15465005 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- Not sure whether we should return `null` here, a negative `SlotRequestRegistered` response or throw an exception which will be handled by the caller. Why did you choose `null`? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522737 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -131,9 +149,16 @@ public RegistrationResponse apply(final JobMasterGateway jobMasterGateway) { * @return Slot assignment */ @RpcMethod - public SlotAssignment requestSlot(SlotRequest slotRequest) { - System.out.println("SlotRequest: " + slotRequest); - return new SlotAssignment(); + public SlotRequestRegistered requestSlot(SlotRequest slotRequest) { + final JobID jobId = slotRequest.getJobId(); + final JobMasterGateway jobMasterGateway = jobMasterGateways.get(jobId); + + if (jobMasterGateway != null) { + return slotManager.requestSlot(slotRequest); + } else { + LOG.info("Ignoring slot request for unknown JobMaster with JobID {}", jobId); + return null; --- End diff -- Not sure whether we should return `null` here, a negative `SlotRequestRegistered` response or throw an exception which will be handled by the caller. Why did you choose `null`? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465061#comment-15465061 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2463 Thanks for your work @mxm. I've had some comments which you can find inline. I think the implementation of the slot request logic made another step in the right direction with this PR. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77538038 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- eh, seemed like the proper way of handling it. Also, (up to) 4 comparisons vs a jump. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547550 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseClient.java --- @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Table; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; + +/** + * A client class that serves to create connection and send data to HBase. + */ +class HBaseClient implements Closeable { + + private static final Logger LOG = LoggerFactory.getLogger(HBaseClient.class); + + private Connection connection; + private Table table; + + public HBaseClient(org.apache.hadoop.conf.Configuration hbConfig, String tableName) throws IOException { + connection = ConnectionFactory.createConnection(hbConfig); + table = connection.getTable(TableName.valueOf(tableName)); --- End diff -- Not able to see the connect method. May be it is not added? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547915 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Maps a input value to a row in HBase table. + * + * @param input type + */ +public interface HBaseMapper extends Function, Serializable { + + /** +* Given an input value return the HBase row key. Row key cannot be null. +* +* @param value +* @return row key +*/ + byte[] rowKey(IN value); --- End diff -- Rest looks good to me. I think naming of functions am not sure if it can be fine tuned. Am not very good at naming. And also FLINK team may have some naming conventions. I think having a connect() method and the above comment are the main things from my side. If it is fine, then it is upto Flink team to review this. Thanks @delding . --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465487#comment-15465487 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547840 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Maps a input value to a row in HBase table. + * + * @param input type + */ +public interface HBaseMapper extends Function, Serializable { + + /** +* Given an input value return the HBase row key. Row key cannot be null. +* +* @param value +* @return row key +*/ + byte[] rowKey(IN value); --- End diff -- Fine with all. But how do you related this rowKey and the one to be passed to createMutations? May be will it be better to have actions(byte[] rowkey, IN value)? and add a getRowKey() API that returns byte[]? This can be used for createMutations? > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2332: [FLINK-2055] Implement Streaming HBaseSink
Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547840 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Maps a input value to a row in HBase table. + * + * @param input type + */ +public interface HBaseMapper extends Function, Serializable { + + /** +* Given an input value return the HBase row key. Row key cannot be null. +* +* @param value +* @return row key +*/ + byte[] rowKey(IN value); --- End diff -- Fine with all. But how do you related this rowKey and the one to be passed to createMutations? May be will it be better to have actions(byte[] rowkey, IN value)? and add a getRowKey() API that returns byte[]? This can be used for createMutations? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-2055) Implement Streaming HBaseSink
[ https://issues.apache.org/jira/browse/FLINK-2055?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465491#comment-15465491 ] ASF GitHub Bot commented on FLINK-2055: --- Github user ramkrish86 commented on a diff in the pull request: https://github.com/apache/flink/pull/2332#discussion_r77547915 --- Diff: flink-streaming-connectors/flink-connector-hbase/src/main/java/org/apache/flink/streaming/connectors/hbase/HBaseMapper.java --- @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.hbase; + +import org.apache.flink.api.common.functions.Function; + +import java.io.Serializable; + +/** + * Maps a input value to a row in HBase table. + * + * @param input type + */ +public interface HBaseMapper extends Function, Serializable { + + /** +* Given an input value return the HBase row key. Row key cannot be null. +* +* @param value +* @return row key +*/ + byte[] rowKey(IN value); --- End diff -- Rest looks good to me. I think naming of functions am not sure if it can be fine tuned. Am not very good at naming. And also FLINK team may have some naming conventions. I think having a connect() method and the above comment are the main things from my side. If it is fine, then it is upto Flink team to review this. Thanks @delding . > Implement Streaming HBaseSink > - > > Key: FLINK-2055 > URL: https://issues.apache.org/jira/browse/FLINK-2055 > Project: Flink > Issue Type: New Feature > Components: Streaming, Streaming Connectors >Affects Versions: 0.9 >Reporter: Robert Metzger >Assignee: Erli Ding > > As per : > http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Write-Stream-to-HBase-td1300.html -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allocation ...
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2463 Thanks for your work @mxm. I've had some comments which you can find inline. I think the implementation of the slot request logic made another step in the right direction with this PR. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465054#comment-15465054 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Why do you create a registry for a single field? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524626 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/highavailability/LeaderIdRegistry.java --- @@ -15,11 +15,27 @@ * See the License for the specific language governing permissions and * limitations under the License. */ +package org.apache.flink.runtime.highavailability; -package org.apache.flink.runtime.resourcemanager; +import java.util.UUID; -import java.io.Serializable; +/** + * Registry class to keep track of the current leader ID. + */ +public class LeaderIdRegistry { --- End diff -- Why do you create a registry for a single field? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +/** + * A container for a dumped metric that contains the scope, name and value(s) of the metric. + */ +public abstract class MetricDump { + /** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */ + public static final byte METRIC_CATEGORY_COUNTER = 0; + public static final byte METRIC_CATEGORY_GAUGE = 1; + public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + + /** The scope information for the stored metric. */ + public final QueryScopeInfo scopeInfo; + /** The name of the stored metric. */ + public final String name; + + private MetricDump(QueryScopeInfo scopeInfo, String name) { + this.scopeInfo = scopeInfo; + this.name = name; + } + + /** +* Returns the category for this MetricDump. +* +* @return category +*/ + public abstract byte getCategory(); --- End diff -- I think we don't need the explicit category information, because it is already encoded in the sub-types of `MetricDump`. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465108#comment-15465108 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +/** + * A container for a dumped metric that contains the scope, name and value(s) of the metric. + */ +public abstract class MetricDump { + /** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */ + public static final byte METRIC_CATEGORY_COUNTER = 0; + public static final byte METRIC_CATEGORY_GAUGE = 1; + public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + + /** The scope information for the stored metric. */ + public final QueryScopeInfo scopeInfo; + /** The name of the stored metric. */ + public final String name; + + private MetricDump(QueryScopeInfo scopeInfo, String name) { + this.scopeInfo = scopeInfo; + this.name = name; --- End diff -- `checkNotNull` missing > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465106#comment-15465106 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +/** + * A container for a dumped metric that contains the scope, name and value(s) of the metric. + */ +public abstract class MetricDump { + /** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */ + public static final byte METRIC_CATEGORY_COUNTER = 0; + public static final byte METRIC_CATEGORY_GAUGE = 1; + public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + + /** The scope information for the stored metric. */ + public final QueryScopeInfo scopeInfo; + /** The name of the stored metric. */ + public final String name; + + private MetricDump(QueryScopeInfo scopeInfo, String name) { + this.scopeInfo = scopeInfo; + this.name = name; + } + + /** +* Returns the category for this MetricDump. +* +* @return category +*/ + public abstract byte getCategory(); --- End diff -- I think we don't need the explicit category information, because it is already encoded in the sub-types of `MetricDump`. > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527854 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -0,0 +1,119 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +/** + * A container for a dumped metric that contains the scope, name and value(s) of the metric. + */ +public abstract class MetricDump { + /** Categories to be returned by {@link MetricDump#getCategory()} to avoid instanceof checks. */ + public static final byte METRIC_CATEGORY_COUNTER = 0; + public static final byte METRIC_CATEGORY_GAUGE = 1; + public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + + /** The scope information for the stored metric. */ + public final QueryScopeInfo scopeInfo; + /** The name of the stored metric. */ + public final String name; + + private MetricDump(QueryScopeInfo scopeInfo, String name) { + this.scopeInfo = scopeInfo; + this.name = name; --- End diff -- `checkNotNull` missing --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink issue #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2363 I think the changes look good. Thanks for your work @zentol :-) I only had a minor question whether we can substitute the explicit category information by the type information of the metric dumps and the `QueryScopeInfo` instances (not for serialization but in the `MetricStore`). --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77540202 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- That is true. Performance-wise it is the more efficient way to execute it, no doubt. I was just wondering whether this is not a case of premature optimization with the price of harder maintainability. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465044#comment-15465044 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524466 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527637 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); if (tm == null) { tm = new TaskManagerMetricStore(); - taskManagers.put(components[1], tm); + taskManagers.put(tmID, tm); } - tm.metrics.put(components[2], value); + addMetric(tm.metrics, name, metric); break; - /** -* Job metric -* format: 2::. -*/ - case "2": - if (components.length != 3) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(jobInfo.jobID, job); } - job.metrics.put(components[2], value); + addMetric(job.metrics, name, metric); break; - /** -* Task metric -* format: 3. -* -* As the WebInterface task metric queries currently do not account for subtasks we don't -* divide by subtask and instead use the concatenation of subtask index and metric name as the name. -
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465132#comment-15465132 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on the issue: https://github.com/apache/flink/pull/2363 I think the changes look good. Thanks for your work @zentol :-) I only had a minor question whether we can substitute the explicit category information by the type information of the metric dumps and the `QueryScopeInfo` instances (not for serialization but in the `MetricStore`). > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- The `SlotStatus` is no longer serializable with this field. Where does the `SlotStatus` come from? If it's coming from the `TaskExecutor`, then the `taskExecutorGateway` has to be retrieved on the `ResourceManager` side. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465111#comment-15465111 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77528158 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDumpSerialization.java --- @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.metrics.dump; + +import org.apache.commons.io.output.ByteArrayOutputStream; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; +import org.apache.flink.metrics.Histogram; +import org.apache.flink.metrics.HistogramStatistics; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.ByteArrayInputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JM; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_JOB; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_OPERATOR; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TASK; +import static org.apache.flink.runtime.metrics.dump.QueryScopeInfo.INFO_CATEGORY_TM; + +/** + * Utility class for the serialization of metrics. + */ +public class MetricDumpSerialization { + private static final Logger LOG = LoggerFactory.getLogger(MetricDumpSerialization.class); + + private MetricDumpSerialization() { + } + + // = Serialization = + public static class MetricDumpSerializer { + private ByteArrayOutputStream baos = new ByteArrayOutputStream(4096); + private DataOutputStream dos = new DataOutputStream(baos); + + /** +* Serializes the given metrics and returns the resulting byte array. +* +* @param counters counters to serialize +* @param gauges gauges to serialize +* @param histograms histograms to serialize +* @return byte array containing the serialized metrics +* @throws IOException +*/ + public byte[] serialize(Map> counters, Map > gauges, Map > histograms) throws IOException { + baos.reset(); + dos.writeInt(counters.size()); + dos.writeInt(gauges.size()); + dos.writeInt(histograms.size()); + + for (Map.Entry > entry : counters.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeCounter(dos, entry.getKey()); + } + + for (Map.Entry > entry : gauges.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); + serializeString(dos, entry.getValue().f1); + serializeGauge(dos, entry.getKey()); + } + + for (Map.Entry > entry : histograms.entrySet()) { + serializeMetricInfo(dos, entry.getValue().f0); +
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465000#comment-15465000 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Why not making it static? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77522339 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java --- @@ -52,15 +58,28 @@ * */ public class ResourceManager extends RpcEndpoint { - private final MapjobMasterGateways; + + private final Logger LOG = LoggerFactory.getLogger(getClass()); --- End diff -- Why not making it static? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress, jobID)); + try { + Await.ready(registrationFuture, Duration.create(5, TimeUnit.SECONDS)); + } catch (Exception e) { + Assert.fail("JobManager registration Future didn't
[jira] [Commented] (FLINK-4579) Add StateBackendFactory for RocksDB Backend
[ https://issues.apache.org/jira/browse/FLINK-4579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465025#comment-15465025 ] Aljoscha Krettek commented on FLINK-4579: - Should be as easy as that, yes. > Add StateBackendFactory for RocksDB Backend > --- > > Key: FLINK-4579 > URL: https://issues.apache.org/jira/browse/FLINK-4579 > Project: Flink > Issue Type: Bug > Components: State Backends, Checkpointing >Reporter: Aljoscha Krettek > > Right now, we only have a {{StateBackendFactory}} for the {{FsStateBackend}} > which means that users cannot specify to use the RocksDB backend in the flink > configuration. > If we add a factory for rocksdb we should also think about adding the rocksdb > backend to the standard distribution lib, otherwise it is only usable if > users manually place the rocks jars in the Flink lib folder. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465023#comment-15465023 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523806 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotProtocolTest.java --- @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.resourcemanager.slotmanager; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.clusterframework.types.SlotID; +import org.apache.flink.runtime.highavailability.LeaderIdRegistry; +import org.apache.flink.runtime.highavailability.NonHaServices; +import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.resourcemanager.JobMasterRegistration; +import org.apache.flink.runtime.resourcemanager.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.SlotRequest; +import org.apache.flink.runtime.resourcemanager.SlotRequestRegistered; +import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.SlotStatus; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.junit.AfterClass; +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; +import org.mockito.Mockito; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.Collections; +import java.util.UUID; +import java.util.concurrent.TimeUnit; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; + +public class SlotProtocolTest { + + private static TestingRpcService testRpcService; + + @BeforeClass + public static void beforeClass() { + testRpcService = new TestingRpcService(); + + } + + @AfterClass + public static void afterClass() { + testRpcService.stopService(); + testRpcService = null; + } + + @Before + public void beforeTest(){ + testRpcService.clearGateways(); + } + + /** +* Tests whether +* 1) SlotRequest is routed to the SlotManager +* 2) SlotRequest leads to a container allocation +* 3) SlotRequest is confirmed +* 4) Slot becomes available and TaskExecutor gets a SlotRequest +*/ + @Test + public void testSlotsUnavailableRequest() throws Exception { + final String rmAddress = "/rm1"; + final String jmAddress = "/jm1"; + final JobID jobID = new JobID(); + + testRpcService.registerGateway(jmAddress, mock(JobMasterGateway.class)); + + + TestingSlotManager slotManager = Mockito.spy(new TestingSlotManager()); + ResourceManager resourceManager = + new ResourceManager(testRpcService, new NonHaServices(rmAddress), slotManager); + resourceManager.start(); + + Future registrationFuture = + resourceManager.registerJobMaster(new JobMasterRegistration(jmAddress,
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465028#comment-15465028 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- How is the confirmation of the `TaskExecutor` sent back to the `SlotManager`? Would it make sense to send it back via the return value of this method? > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2463: [FLINK-4538][FLINK-4348] ResourceManager slot allo...
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77523985 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java --- @@ -32,4 +33,11 @@ // void notifyOfNewResourceManagerLeader(String address, UUID resourceManagerLeaderId); + + /** +* Send by the ResourceManager to the TaskExecutor +* @param allocationID id for the request +* @param resourceManagerLeaderID current leader id of the ResourceManager +*/ + void requestSlot(AllocationID allocationID, UUID resourceManagerLeaderID); --- End diff -- How is the confirmation of the `TaskExecutor` sent back to the `SlotManager`? Would it make sense to send it back via the return value of this method? --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465100#comment-15465100 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527444 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- What's the benefit of having an explicit type field over using `instanceof`? I think encoding the type via the actual type has the advantage that you don't mix up classes with wrong category types. > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527444 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- What's the benefit of having an explicit type field over using `instanceof`? I think encoding the type via the actual type has the advantage that you don't mix up classes with wrong category types. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465103#comment-15465103 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77527637 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: + addMetric(jobManager.metrics, name, metric); + case INFO_CATEGORY_TM: + String tmID = ((QueryScopeInfo.TaskManagerQueryScopeInfo) info).taskManagerID; + tm = taskManagers.get(tmID); if (tm == null) { tm = new TaskManagerMetricStore(); - taskManagers.put(components[1], tm); + taskManagers.put(tmID, tm); } - tm.metrics.put(components[2], value); + addMetric(tm.metrics, name, metric); break; - /** -* Job metric -* format: 2::. -*/ - case "2": - if (components.length != 3) { - break; - } - job = jobs.get(components[1]); + case INFO_CATEGORY_JOB: + QueryScopeInfo.JobQueryScopeInfo jobInfo = (QueryScopeInfo.JobQueryScopeInfo) info; + job = jobs.get(jobInfo.jobID); if (job == null) { job = new JobMetricStore(); - jobs.put(components[1], job); + jobs.put(jobInfo.jobID, job); } - job.metrics.put(components[2], value); + addMetric(job.metrics, name, metric); break; - /** -* Task metric -* format: 3. -* -* As the
[jira] [Resolved] (FLINK-4479) Replace trademark (tm) with registered trademark (R) sign on Flink website
[ https://issues.apache.org/jira/browse/FLINK-4479?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Robert Metzger resolved FLINK-4479. --- Resolution: Fixed Resolved with https://github.com/apache/flink-web/commit/f2053d28b12390539275af8d3a6e14941baeba98 > Replace trademark (tm) with registered trademark (R) sign on Flink website > -- > > Key: FLINK-4479 > URL: https://issues.apache.org/jira/browse/FLINK-4479 > Project: Flink > Issue Type: Bug > Components: Project Website >Reporter: Robert Metzger >Assignee: Robert Metzger > > Flink is now a registered trademark, so we should reflect that on our website. -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465265#comment-15465265 ] ASF GitHub Bot commented on FLINK-4389: --- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77538038 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- eh, seemed like the proper way of handling it. Also, (up to) 4 comparisons vs a jump. > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465303#comment-15465303 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77540202 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/metrics/MetricStore.java --- @@ -35,109 +46,111 @@ final MaptaskManagers = new HashMap<>(); final Map jobs = new HashMap<>(); - /** -* Adds a metric to this MetricStore. -* -* @param name the metric identifier -* @param value the metric value -*/ - public void add(String name, Object value) { - TaskManagerMetricStore tm; - JobMetricStore job; - TaskMetricStore task; - + public void add(MetricDump metric) { try { - String[] components = name.split(":"); - switch (components[0]) { - /** -* JobManagerMetricStore metric -* format: 0:. -*/ - case "0": - jobManager.metrics.put(components[1], value); - break; - /** -* TaskManager metric -* format: 1::. -*/ - case "1": - if (components.length != 3) { - break; - } - tm = taskManagers.get(components[1]); + QueryScopeInfo info = metric.scopeInfo; + TaskManagerMetricStore tm; + JobMetricStore job; + TaskMetricStore task; + + String name = info.scope.isEmpty() + ? metric.name + : info.scope + "." + metric.name; + + if (name.isEmpty()) { // malformed transmission + return; + } + + switch (info.getCategory()) { + case INFO_CATEGORY_JM: --- End diff -- That is true. Performance-wise it is the more efficient way to execute it, no doubt. I was just wondering whether this is not a case of premature optimization with the price of harder maintainability. > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4538) Implement slot allocation protocol with JobMaster
[ https://issues.apache.org/jira/browse/FLINK-4538?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465037#comment-15465037 ] ASF GitHub Bot commented on FLINK-4538: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2463#discussion_r77524239 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/SlotStatus.java --- @@ -46,13 +47,21 @@ /** if the slot is allocated, jobId identify which job this slot is allocated to; else, jobId is null */ private final JobID jobID; - public SlotStatus(SlotID slotID, ResourceProfile profiler) { - this(slotID, profiler, null, null); + /** Gateway to the TaskManager which reported the SlotStatus */ + private final TaskExecutorGateway taskExecutorGateway; --- End diff -- The `SlotStatus` is no longer serializable with this field. Where does the `SlotStatus` come from? If it's coming from the `TaskExecutor`, then the `taskExecutorGateway` has to be retrieved on the `ResourceManager` side. > Implement slot allocation protocol with JobMaster > - > > Key: FLINK-4538 > URL: https://issues.apache.org/jira/browse/FLINK-4538 > Project: Flink > Issue Type: Sub-task > Components: Cluster Management >Reporter: Maximilian Michels >Assignee: Maximilian Michels > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-4389) Expose metrics to Webfrontend
[ https://issues.apache.org/jira/browse/FLINK-4389?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465125#comment-15465125 ] ASF GitHub Bot commented on FLINK-4389: --- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77529056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -25,6 +25,7 @@ public static final byte METRIC_CATEGORY_COUNTER = 0; public static final byte METRIC_CATEGORY_GAUGE = 1; public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + public static final byte METRIC_CATEGORY_METER = 3; --- End diff -- Sorry just saw your latest commit. > Expose metrics to Webfrontend > - > > Key: FLINK-4389 > URL: https://issues.apache.org/jira/browse/FLINK-4389 > Project: Flink > Issue Type: Sub-task > Components: Metrics, Webfrontend >Affects Versions: 1.1.0 >Reporter: Chesnay Schepler >Assignee: Chesnay Schepler > Fix For: pre-apache > > > https://cwiki.apache.org/confluence/display/FLINK/FLIP-7%3A+Expose+metrics+to+WebInterface -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink pull request #2363: [FLINK-4389] Expose metrics to WebFrontend
Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2363#discussion_r77529056 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/metrics/dump/MetricDump.java --- @@ -25,6 +25,7 @@ public static final byte METRIC_CATEGORY_COUNTER = 0; public static final byte METRIC_CATEGORY_GAUGE = 1; public static final byte METRIC_CATEGORY_HISTOGRAM = 2; + public static final byte METRIC_CATEGORY_METER = 3; --- End diff -- Sorry just saw your latest commit. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Resolved] (FLINK-4073) YARNSessionCapacitySchedulerITCase.testTaskManagerFailure failed on Travis
[ https://issues.apache.org/jira/browse/FLINK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Maximilian Michels resolved FLINK-4073. --- Resolution: Fixed Assignee: Maximilian Michels Fix Version/s: 1.2.0 Fixed with 2f87f61d34414074bc09ba8584d345bd400ed3cd. > YARNSessionCapacitySchedulerITCase.testTaskManagerFailure failed on Travis > -- > > Key: FLINK-4073 > URL: https://issues.apache.org/jira/browse/FLINK-4073 > Project: Flink > Issue Type: Bug > Components: Tests >Affects Versions: 1.1.0 >Reporter: Till Rohrmann >Assignee: Maximilian Michels >Priority: Critical > Labels: test-stability > Fix For: 1.2.0 > > > The test case {{YARNSessionCapacitySchedulerITCase.testTaskManagerFailure}} > failed on Travis. > https://s3.amazonaws.com/archive.travis-ci.org/jobs/137498643/log.txt -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[GitHub] flink issue #2275: FLINK-3929 Support for Kerberos Authentication with Keyta...
Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 The Kafka tests fail: ``` Running org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 3 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:155) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:129) at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89) at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71) at kafka.server.KafkaServer.initZk(KafkaServer.scala:278) at kafka.server.KafkaServer.startup(KafkaServer.scala:168) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:336) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:170) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:41) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131) at org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase.prepare(Kafka09SecureRunITCase.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.945 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase ``` The goal should be that all builds pass on Travis before we merge this. --- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. ---
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465144#comment-15465144 ] ASF GitHub Bot commented on FLINK-3929: --- Github user mxm commented on the issue: https://github.com/apache/flink/pull/2275 The Kafka tests fail: ``` Running org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase org.I0Itec.zkclient.exception.ZkTimeoutException: Unable to connect to zookeeper server within timeout: 3 at org.I0Itec.zkclient.ZkClient.connect(ZkClient.java:1223) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:155) at org.I0Itec.zkclient.ZkClient.(ZkClient.java:129) at kafka.utils.ZkUtils$.createZkClientAndConnection(ZkUtils.scala:89) at kafka.utils.ZkUtils$.apply(ZkUtils.scala:71) at kafka.server.KafkaServer.initZk(KafkaServer.scala:278) at kafka.server.KafkaServer.startup(KafkaServer.scala:168) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.getKafkaServer(KafkaTestEnvironmentImpl.java:336) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironmentImpl.prepare(KafkaTestEnvironmentImpl.java:170) at org.apache.flink.streaming.connectors.kafka.KafkaTestEnvironment.prepare(KafkaTestEnvironment.java:41) at org.apache.flink.streaming.connectors.kafka.KafkaTestBase.startClusters(KafkaTestBase.java:131) at org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase.prepare(Kafka09SecureRunITCase.java:45) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:483) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48) at org.junit.rules.RunRules.evaluate(RunRules.java:20) at org.junit.runners.ParentRunner.run(ParentRunner.java:309) at org.apache.maven.surefire.junit4.JUnit4Provider.execute(JUnit4Provider.java:283) at org.apache.maven.surefire.junit4.JUnit4Provider.executeWithRerun(JUnit4Provider.java:173) at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:153) at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:128) at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:203) at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:155) at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:103) Tests run: 1, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 47.945 sec <<< FAILURE! - in org.apache.flink.streaming.connectors.kafka.Kafka09SecureRunITCase ``` The goal should be that all builds pass on Travis before we merge this. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)
[jira] [Commented] (FLINK-3929) Support for Kerberos Authentication with Keytab Credential
[ https://issues.apache.org/jira/browse/FLINK-3929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15465855#comment-15465855 ] ASF GitHub Bot commented on FLINK-3929: --- Github user vijikarthi commented on the issue: https://github.com/apache/flink/pull/2275 @mxm I believe the ZK timeout issue occurs from LocalFlinkMiniClusterITCase->testLocalFlinkMiniClusterWithMultipleTaskManagers test case but it is not consistent. I ran the Kafka test case alone and it worked. I also ran "mvn clean verify" and I don't see any errors (after couple of retry - same ZK timeout error from LocalFlinkMiniClusterITCase) . It looks like there is some inconsistency in some of the integration test scenarios. > Support for Kerberos Authentication with Keytab Credential > -- > > Key: FLINK-3929 > URL: https://issues.apache.org/jira/browse/FLINK-3929 > Project: Flink > Issue Type: New Feature >Reporter: Eron Wright >Assignee: Vijay Srinivasaraghavan > Labels: kerberos, security > Original Estimate: 672h > Remaining Estimate: 672h > > _This issue is part of a series of improvements detailed in the [Secure Data > Access|https://docs.google.com/document/d/1-GQB6uVOyoaXGwtqwqLV8BHDxWiMO2WnVzBoJ8oPaAs/edit?usp=sharing] > design doc._ > Add support for a keytab credential to be associated with the Flink cluster, > to facilitate: > - Kerberos-authenticated data access for connectors > - Kerberos-authenticated ZooKeeper access > Support both the standalone and YARN deployment modes. > -- This message was sent by Atlassian JIRA (v6.3.4#6332)